1. 程式人生 > >Kafka 安裝與部署(單機版)與kafkaDemo除錯測試(包含JAVA Demo)

Kafka 安裝與部署(單機版)與kafkaDemo除錯測試(包含JAVA Demo)

部署需要的包
1. kafka_2.10-0.10.2.0.tar

1.解壓kafka_2.10-0.10.2.0安裝包

tar -xvf kafka_2.10-0.10.2.0.tar  

2.配置kafka

cd /software/kafka_2.10-0.10.2.0/conf


(1) server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# Switch to enable topic deletion or not, default value is false
#delete.topic.enable=true

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.1.104:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().

advertised.listeners=PLAINTEXT://192.168.1.104:9092
hostname=192.168.1.104


# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=192.168.1.104:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

3.啟動kafka

啟動zookeeper:nohup bin/zookeeper-server-start.sh config/zookeeper.properties 1>zookeeper.log 2>zookeeper.err &
啟動kafka:nohup bin/kafka-server-start.sh config/server.properties &

4.單機測試:

(1)生產者

bin/kafka-console-producer.sh --broker-list 192.168.1.104:9092 --topic test
輸進訊息: lmx

(2)消費者

bin/kafka-console-consumer.sh --zookeeper 192.168.1.104:2181 --topic test --from-beginning
收到訊息: lmx

4.JAVA程式碼測試:

(1)配置類:ConfigureAPI.class

package kafkaDemo;

public class ConfigureAPI
{

    public final static String GROUP_ID = "test";

    public final static String TOPIC = "test-lmx";

    public final static int BUFFER_SIZE = 64 * 1024;

    public final static int TIMEOUT = 20000;

    public final static int INTERVAL = 10000;

    public final static String BROKER_LIST = "192.168.1.104:9092,192.168.1.105:9092";

    // 去資料間隔
    public final static int GET_MEG_INTERVAL = 1000;

}

( 2 ) 生產者類:JProducer.class

package kafkaDemo;


import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;


public class JProducer implements Runnable
{
    private Producer<String, String> producer;

    public JProducer()
    {
        Properties props = new Properties();

        props.put("bootstrap.servers", ConfigureAPI.BROKER_LIST);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("request.required.acks", "-1");

        producer = new KafkaProducer<String, String>(props);

    }

    @Override
    public void run()
    {
        // TODO Auto-generated method stub
        try
        {
            String data = "hello lmx!";
            producer.send(new ProducerRecord<String, String>(ConfigureAPI.TOPIC, data));
            System.out.println(data);
        }
        catch (Exception e)
        {
            // TODO: handle exception
            e.getStackTrace();
        }
        finally
        {
            producer.close();
        }

    }

    public static void main(String[] args)
    {
        ExecutorService threadPool = Executors.newCachedThreadPool();
        threadPool.execute(new JProducer());
        threadPool.shutdown();
    }

}

執行效果:


( 3 ) 消費者類:JConsumer.class
package kafkaDemo;


import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;


public class JConsumer implements Runnable
{

    private KafkaConsumer<String, String> consumer;

    private JConsumer()
    {
        Properties props = new Properties();

        props.put("bootstrap.servers", ConfigureAPI.BROKER_LIST);
        props.put("group.id", ConfigureAPI.GROUP_ID);
        props.put("enable.auto.commit", true);
        props.put("auto.commit.interval.ms", 1000);
        props.put("session.timeout.ms", 30000);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(ConfigureAPI.TOPIC)); // 多個topic逗號隔開

    }

    @Override
    public void run()
    {
        // TODO Auto-generated method stub
        while (true)
        {
            System.out.println("poll Server message");
            ConsumerRecords<String, String> records = consumer.poll(ConfigureAPI.GET_MEG_INTERVAL);
            for (ConsumerRecord<String, String> record : records)
            {
                handleMeg(record.value());

            }
        }

    }

    private void handleMeg(String record)
    {

        System.out.println(record);

    }

    public static void main(String[] args)
    {
        ExecutorService threadPool = Executors.newCachedThreadPool();
        threadPool.execute(new JConsumer());
        threadPool.shutdown();
    }
}

執行效果:


附屬某些錯誤解決辦法:

(1) 錯誤:Unable to connect to zookeeper server '192.168.1.104:2181' with timeout of 4000 ms   

解決辦法:  

1.防火牆要關閉

使用service iptables stop 關閉防火牆  
使用service iptables status確認  
使用chkconfig iptables off禁用防火牆  

2.只打開2181埠

iptables -I INPUT -p tcp --dport 2181 -j ACCEPT

(2) 錯誤:kafka Failed to send messages after 3 tries  

解決辦法:

修改server.properties

listeners=PLAINTEXT://192.168.1.104:9092
advertised.listeners=PLAINTEXT://192.168.1.104:9092
hostname=192.168.1.104

總結不好多多擔待,文章只單純個人總結,如不好勿噴,技術有限,有錯漏麻煩指正提出。本人QQ:373965070



相關推薦

Kafka 安裝部署單機kafkaDemo除錯測試(包含JAVA Demo)

部署需要的包:1. kafka_2.10-0.10.2.0.tar1.解壓kafka_2.10-0.10.2.0安裝包tar -xvf kafka_2.10-0.10.2.0.tar 2.配置kafkacd /software/kafka_2.10-0.10.2.0/con

Hbase的安裝部署叢集

HBase 部署與使用 部署 Zookeeper 正常部署 $ ~/modules/zookeeper-3.4.5/bin/zkServer.sh start 首先保證 Zookeeper 叢集的正常部署,並啟動之: Hadoop 正常部署 $ ~/modules/hadoop-2.7.2/sbi

一步教你Docker安裝搭建redis單機

1.Docker 安裝 Redis   方案一:使用docker拉取映象     查詢Docker Hub上的redis映象     #docker search  redis     拉取

hadoop2.7.3在centos7上部署安裝單機

hadoop2.7.3在centos7上部署安裝(單機版)   (1)hadoop2.7.3下載 (前提:先安裝java環境) 下載地址:http://hadoop.apache.org/releases.html (注意是binary檔案,source那個是原始

redis原始碼安裝單機

● 下載redis3的穩定版本 ● 上傳至linux解壓 [[email protected] ~]# tar -zxvf redis-3.2.12.tar.gz -C /usr/local/src/ ● 進入原始碼包編譯並安裝 [[email protec

Hadoop部署——CentOS 7部署Hadoop單機

測試環境 Linux系統版本:CentOS 7 64位 Hadoop部署方式介紹 Hadoop部署方式分三種:Standalone Mode(單機模式)、Pseudo-Distributed Mode(偽分散式模式)、Fully Distributed Mode(全

Hadoop 和 Hbase 的安裝配置 單機模式

(一定要看最後我趟過的坑,如果安裝過程有問題,可參考最後我列出的問題及解決方法) 下載Hadoop安裝包 這裡安裝版本:hadoop-1.0.4.tar.gz 在安裝Hadoop之前,伺服器上一定要有安裝的jdk jdk安裝方式之一:在官網上下載Linux下的rpm

openshift/origin學習記錄1——基於二進位制檔案的安裝單機

先決條件 開啟SELINUX 官方文件推薦開啟SELINUX,否則會導致安裝失敗。 修改/etc/selinux/config SELINUX=enforcing SELINUXTYPE=targeted 安裝docker # y

docker中安裝redis和zookeeper方法,親測可用單機

比較好用的映象地址: docker pull daocloud.io/daocloud/zookeeper:3.4.10 安裝redis: ①通過pull命令下載映象,映象地址可以使用上面的 ②docker啟動redis映象容器,使用以下命令,親測好用 docker run

SQL Server安裝使用破解

【注】博主使用的是SQL Server2012 其他版本的安裝類似。 【第一步】下載軟體  連結:https://www.microsoft.com/zh-cn/download/details.aspx?id=29066 32位的Windows 7作業系統,只需下載列

【一】linux安裝redis單機、3種啟動方式、及配置檔案介紹。

環境ubuntu16.04 解壓 tar -zxvf redis-3.2.6.tar.gz 修改資料夾名稱 mv redis-3.2.6 redis 編譯 cd /app/redis make 編譯好後會看到redis.conf和src檔案 安裝 cd

分散式--CentOS安裝zookeeper單機

開啟我們的分散式之路吧!!! (1)官網下載zookeeper,我這裡使用的是3.4.5版本。下載地址 並且我們要給自己的伺服器配置hosts先 $ vi /etc/hosts (2)解壓唄,我們這就不解壓在usr/local那裡了,

數字圖像處理原理實踐MATLAB勘誤表

blog 核心 灰度變換 圖像復原 京東 .html href target 數字圖像處理 本文系《數字圖像處理原理與實踐(MATLAB版)》一書的勘誤表。【內容簡單介紹】本書全面系統地介紹了數字圖像處理技術的理論與方法,內容涉及幾何變換、灰度變換、圖像增強、圖像切割、

使用Spring Data Redis操作Redis單機

nec one com list() 研究 enc keys wire 設置ip Jedis是一款Java連接Redis的客戶端,Spring基於Jedis進行了封裝,提供了簡潔的操作Redis的方法。那就是Spring Data Redis。其實後來研究發現,Spring

php-擴展編譯安裝擴展通用

php編譯擴展 redis編譯安裝這裏以安裝redis擴展為例,其它擴展可以大體仿照如此過程: 1.到 pecl.php.net 搜索 redis 2.下載 stable 版(穩定版)擴展 3.解壓 4.執行 /php/path/bin/phpize (作用是檢測 php 的內核版本,並為擴展生成相應的編譯

白話windows server 2012 r2和windows 7創建ad域配置安全

關閉 logo 打不開 cmd 2012 r2 以太網 子網 int 180天 文章的可讀性非常重要,這裏提供的是一鍵式操作指南,即使之前完全沒有接觸,也可以配置完成。 ad域的創建是為了便於公司的集中化管理,提高公司運作效率和安全性。 我的操作環境,本機是kali lin

Visual Studio 2017 安裝使用教程詳細

系統設置 -s 分享圖片 代碼 ++ 官網下載 studio 題解 微軟官網 Visual Studio 2017 安裝使用教程(詳細) 本人曾因無法使用vs編寫C語言程序痛苦一個月之久,實乃慚愧,後發現不少同學也同樣存在著相同問題,其原因歸結於網上的各種教程

《矩陣分析應用第二張賢達》PDF

image aid images db4 粘貼 proc com follow process 下載:https://pan.baidu.com/s/1fbhJ4I2MNKozlYkFiadCoA 《矩陣分析與應用(第二版)張賢達》PDF帶目錄和書簽,文字可以復制粘貼。經典

Ubuntu下Laravel的開發環境安裝部署Vagrant + Homestead

2018-2-6 更新 注意! laravel/homestead box專案地址已經不再是原來的 https://atlas.hashicorp.com/laravel/boxes/homestead,而已經變更成 https://app.vagrantup.com/laravel/

Schedule 排程系統設計單機

鑑於對Spring實現的@Scheduled的排程和SchedulerFactoryBean的研究發現,基於Spring的排程封裝雖滿足了大多需求,但為了簡化使用方式使得Job並不容易得到控制,導致開發對Job的控制和運維成本上升;下面是本人基於Quartz和Spring及Annotation開發的單機