1. 程式人生 > 實用技巧 >如何將IDEA開發的java web專案移植到騰訊雲伺服器

如何將IDEA開發的java web專案移植到騰訊雲伺服器

目錄

springboot整合kafka

參考:

配置

依賴

需要web和kafka

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

注意,springboot版本對kafka版本影響不小,1.x可以使用1.x的kafka(比如1.1.1.RELEASE),2.0.x使用2.1.7.RELEASE,2.1.x使用 2.2.x.RELEASE;
版本不對都會導致專案無法啟動

yml配置


#============== kafka ===================
# 指定kafka 代理地址,可以多個
spring:
  kafka:
  #指定kafka server的地址,叢集配多個,中間,逗號隔開,或者使用  列表格式  
  # - 服務1
  # - 服務2   ....
    bootstrap-servers: 192.168.88.128:9092
    #=============== provider  =======================
    producer:
      retries: 0
      # 每次批量傳送訊息的數量
      batch-size: 16384
      acks: 1
      #這個值只能大不能小了,否則會影響sleuth。可以使用的最大記憶體來快取等待發送到server端的訊息
      buffer-memory: 1048576  # 這是最小的?
      retries: 0
      # 指定訊息key和訊息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        # 單個請求的最大大小(以位元組為單位)
        max.request.size: 2097152
        # 從傳送請求到收到ACK確認等待的最長時間(超時時間)
        request.timeout.ms: 40000
        # 這項設定設定了批量處理的更高的延遲邊界:一旦我們獲得某個partition的batch.size,他將會立即傳送而不顧這項設定,然而如果我們獲得訊息位元組數比這項設定要小的多,
        # 我們需要“linger”特定的時間以獲取更多的訊息。 這個設定預設為0,即沒有延遲。設定linger.ms=5,例如,將會減少請求數目,但是同時會增加5ms的延遲。
        linger.ms: 1
        # 訊息傳送失敗的情況下,重試傳送的次數 存在訊息傳送是成功的,只是由於網路導致ACK沒收到的重試,會出現訊息被重複傳送的情況
        message.send.max.retries: 0
    consumer:
      # 指定預設消費者group id
      group-id: test-consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

測試

訊息實體

@Data
@Accessors(chain = true)
@NoArgsConstructor
public class Message {
    /**
     * id
     */
    private Long id;
    /**
     * 訊息
     */
    private String msg;
    /**
     * 時間戳
     */
    private Date sendTime;

}

傳送訊息

/** 訊息傳送方
 * @author jingshiyu
 * @date 2019/7/31 14:04:21
 * @desc
 */
@RestController
@Slf4j
public class KafkaSender {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @RequestMapping("/send")
    public void send(@RequestParam String msg) {
        Message message=new Message();
        message.setId(123L).setMsg(msg).setSendTime(new Date());
        kafkaTemplate.send("kafka_one", JSON.toJSONString(message));
    }

}

就這樣,傳送訊息程式碼就實現了。

這裡關鍵的程式碼為 kafkaTemplate.send() 方法,kafka_one 是 Kafka 裡的 topic ,這個 topic 在 Java 程式中是不需要提前在 Kafka 中設定的,因為它會在傳送的時候自動建立你設定的 topic, JSON.toJSONString(message) 是訊息內容

接收訊息

/**
 * 監聽伺服器上的kafka是否有相關的訊息發過來
 */
@Component
@Slf4j
public class KafkaReceiver {
    /**
     * 定義此消費者接收topics = {"kafka_one"}的訊息,與controller中的topic對應上即可
     * @param record 變數代表訊息本身,可以通過ConsumerRecord<?,?>型別的record變數來列印接收的訊息的各種資訊
     */
    @KafkaListener(topics = {"kafka_one"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            log.info("----------------- record =" + record);
            log.info("------------------ message =" + message);
        }
    }
}

客戶端 consumer 接收訊息特別簡單,直接用@KafkaListener 註解即可,並在監聽中設定監聽的 topic ,topics 是一個數組所以是可以繫結多個主題的,上面的程式碼中修改為 @KafkaListener(topics = {"zhisheng","tian"}) 就可以同時監聽兩個 topic 的訊息了。需要注意的是:這裡的 topic 需要和訊息傳送類 KafkaSender.java 中設定的 topic 一致。

傳送訊息

啟動專案之後,呼叫介面傳送訊息

http://192.168.0.173:8083/send?msg=測試訊息

將會接收到訊息

record =ConsumerRecord(topic = kafka_one, partition = 0, offset = 0, CreateTime = 1564556254952, serialized key size = -1, serialized value size = 56, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":123,"msg":"測試訊息","sendTime":1564556254808})

message ={"id":123,"msg":"測試訊息","sendTime":1564556254808}

kafka檢視

./kafka-topics.sh --list --zookeeper localhost:2181  在kafka上檢視topic列表

就會發現剛才我們程式中的 kafka_one 已經自己建立了