Kafka Provider & Consumer Demo

By | 2019年2月2日

版权声明:欢迎转载,转载请说明出处https://csdn.yanxml.com。大数据Github项目地址https://github.com/SeanYanxml/bigdata。 https://blog.csdn.net/u010416101/article/details/79561719

前言

很久没有深入调研Kafka了,最近开始研究起来。最好能够弄一份源码,满满细读。

本文主要讲解如何写一个Kafka的官方样例。
PS: 哎,Kafka迭代了好几个版本了,周围的人也是物是人非。


Demo

本文的Demo主要包括如下的几个部分:

  • Maven 依赖
  • Provider
  • Consumer
  • 启动入口

前置条件: JDK、Maven、Kafka集群(Kafka+ZooKeeper)

  • Maven 依赖

特别注意Kafka、Scala、和ZooKeeper的版本。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.yanxml</groupId>
        <artifactId>bigdata</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <artifactId>kafka</artifactId>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>1.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>

    </dependencies>
</project>
  • 类KafkaConfig
package com.yanxml.kafka.newquickstart;

public class KafkaConfig {

//  public static String kafkaUrl="192.168.100.62:9092,192.168.100.63:9092,192.168.100.64:9092";

    public static String kafkaUrl="192.168.100.62:9093,192.168.100.63:9093,192.168.100.64:9093";

    public static String topic="sean-security";
    public static String key="";

}
  • ConsumerDemo
package com.yanxml.kafka.newquickstart;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaConsumerDemo {
    public KafkaConsumer<String, String> consumer;

    public KafkaConsumerDemo() {
        Properties props = new Properties();
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
        props.put("bootstrap.servers", KafkaConfig.kafkaUrl);
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(KafkaConfig.topic));// 订阅的Topic 可以写多个
                                                                // "TopicA","TopicB","TopicC"
    }

    public void consume() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n",
                        record.offset(), record.key(), record.value());
        }
    }

}
  • ProviderDemo
package com.yanxml.kafka.newquickstart;

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerDemo {

    public Producer<String, String> producer;

    public KafkaProducerDemo() {
        Properties props = new Properties();
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
        props.put("bootstrap.servers", KafkaConfig.kafkaUrl);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<>(props);
    }

    public void seand(String topicName, String content) {
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("sean-security",
                    Integer.toString(i), Integer.toString(i)));
        // producer.close();

    }

}
  • 启动类
# RunConsumerDemo.class
package com.yanxml.kafka.newquickstart;

public class RunConsumerDemo {
    public static void main(String[]args){
        KafkaConsumerDemo consumerDemo = new KafkaConsumerDemo();
        consumerDemo.consume();
    }

}

# RunProducerDemo.class
package com.yanxml.kafka.newquickstart;

public class RunProducerDemo {

    public static void main(String []args){
//      KafkaConsumerDemo consumerDemo = new KafkaConsumerDemo();
        KafkaProducerDemo producerDemo = new KafkaProducerDemo();

        producerDemo.seand(null, null);
    }
}

总结

没有太多的技术含量,非常的简单。主要是Java包的版本要与集群尽量一致。

代码已经上传至Git
https://github.com/SeanYanxml/bigdata.git


Reference

[1]. kafka 伪集群搭建及java代码实现
[2]. Kafka官方文档

发表评论