利用Kafka发送/消费消息-Java示例

By | 2019年2月2日

当使用命令行工具把基本的组件运行起来后,再使用Java client就很简单,这里是入门的第一个Java客户端程序,有很多需要深入理解的地方。

依赖配置

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.8.2.1</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

注意kafka的artifact_Id(比如我用的kafka_2.11)后面的版本号一定要和本机装的Scala版本一致,否则会报以下错误。

Exception in thread "main" java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
    at kafka.utils.Pool.<init>(Pool.scala:28)
    at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:91)
    at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
    at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:69)
    at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:105)
    at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
    at com.vonzhou.learn.message.KafkaConsumer.<init>(KafkaConsumer.java:23)
    at com.vonzhou.learn.message.KafkaConsumer.main(KafkaConsumer.java:72)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 13 more
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

Producer发送消息

public class MessageProducer {

    private Producer<String,String> producer;

    public static void main(String[] args) {
        new MessageProducer().start();
    }

    public void init(){
        Properties props = new Properties();
        /**
         * 用于自举(bootstrapping ),producer只是用它来获得元数据(topic, partition, replicas)
         * 实际用户发送消息的socket会根据返回的元数据来确定
         */
        props.put("metadata.broker.list", "localhost:9092");
        /**
         * 消息的序列化类
         * 默认是 kafka.serializer.DefaultEncoder, 输入时 byte[] 返回是同样的字节数组
         */
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        /**
         * producer发送消息后是否等待broker的ACK,默认是0
         * 1 表示等待ACK,保证消息的可靠性
         */
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        // 泛型参数分别表示 The first is the type of the Partition key, the second the type of the message
        producer = new Producer<String, String>(config);
    }

    public void produceMsg(){
        // 构建发送的消息
        long timestamp = System.currentTimeMillis();
        String msg = "Msg" + timestamp;
        String topic = "test";  // 确保有这个topic
        System.out.println("发送消息" + msg);
        String key = "Msg-Key" + timestamp;

        /**
         * topic: 消息的主题
         * key:消息的key,同时也会作为partition的key
         * message:发送的消息
         */
        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, key, msg);

        producer.send(data);
    }

    public void start() {
        System.out.println("开始发送消息 ...");
        Executors.newSingleThreadExecutor().execute(new Runnable() {
            public void run() {
                init();
                while (true) {
                    try {
                        produceMsg();
                        Thread.sleep(2000);
                    } catch (Throwable e) {
                        if (producer != null) {
                            try {
                                producer.close();
                            } catch (Throwable e1) {
                                System.out.println("Turn off Kafka producer error! " + e);
                            }
                        }
                    }

                }

            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

Consumer消费消息

public class MessageConsumer {
    private ConsumerConnector consumer;
    private String topic;

    public static void main(String[] arg) {
        new MessageConsumer().start();
    }

    public void init(){
        // 指定 zookeeper 的地址
        String zookeeper = "localhost:2181";
        String topic = "test";
        String groupId = "test-group";

        Properties props = new Properties();
        /**
         * 必须的配置
         */
        props.put("zookeeper.connect", zookeeper);
        /**
         * 必须的配置, 代表该消费者所属的 consumer group
         */
        props.put("group.id", groupId);
        /**
         * 多长时间没有发送心跳信息到zookeeper就会认为其挂掉了,默认是6000
         */
        props.put("zookeeper.session.timeout.ms", "6000");
        /**
         * 可以允许zookeeper follower 比 leader慢的时长
         */
        props.put("zookeeper.sync.time.ms", "200");
        /**
         * 控制consumer offsets提交到zookeeper的频率, 默认是60 * 1000
         */
        props.put("auto.commit.interval.ms", "1000");


        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }
    public void consume() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);
        /**
         * createMessageStreams 为每个topic创建 message stream
         */
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
        while (iterator.hasNext()) {
            try {
                String message = new String(iterator.next().message());
                System.out.println("收到消息" + message);
            } catch (Throwable e) {
                System.out.println(e.getCause());
            }
        }

    }

    public void start() {
        System.out.println("开始消费消息...");
        Executors.newSingleThreadExecutor().execute(new Runnable() {

            public void run() {
                init();
                while (true) {
                    try {
                        consume();
                    } catch (Throwable e) {
                        if (consumer != null) {
                            try {
                                consumer.shutdown();
                            } catch (Throwable e1) {
                                System.out.println("Turn off Kafka consumer error! " + e);
                            }
                        }
                    }
                }
            }
        });
    }
}

发表评论