【java】Kafka发送消息与接收消息简单demo

2018-12-08 02:16 
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/m0_38001814/article/details/82628101

一、先上pom依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.0</version>
        </dependency>

二、消息提供者

package uyun.hornet.ticket.impl.service;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import uyun.hornet.ticket.impl.common.DataCallback;

import java.util.Properties;

/**
 * Created by xujia on 2018/9/11
 */
public class KafkaProducter {
    private final KafkaProducer<String, String> producer;
    public final static String TOPIC = "itsm-test";

    private KafkaProducter(){
        Properties properties = new Properties();
        //此处配置的是kafka的端口
        properties.put("bootstrap.servers", "xxxxx");

        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        properties.put("request.required.acks","-1");

        producer = new KafkaProducer<>(properties);
    }

    void produce() {
        //发送100条消息
        int messageNo = 100;
        int count = 200;
        while (messageNo < count) {
            String key = String.valueOf(messageNo);
            String data = "hello kafka message " + key;
            long startTime = System.currentTimeMillis();
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, data);
            producer.send(record, new DataCallback(startTime, data));
            System.out.println(data);
            messageNo++;
        }
    }
    public static void main( String[] args )
    {
        new KafkaProducter().produce();
    }
}

DataCallback为自定义回调类,如下

package uyun.hornet.ticket.impl.common;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * kafka回调函数
 * Created by xujia on 2018/9/7
 */
public class DataCallback implements Callback {

    private static final Logger logger = LoggerFactory.getLogger(DataCallback.class);
    private final long startTime;
    private final String message;

    public DataCallback(long startTime, String message) {
        this.startTime = startTime;
        this.message = message;
    }

    /**
     * 生产者成功发送消息,收到kafka服务端发来的ACK确认消息后,会调用此回调函数
     * @param recordMetadata 生产者发送的消息的元数据,如果发送过程中出现异常,此参数为null
     * @param e 发送过程中出现的异常,如果发送成功,则此参数为null
     */
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (recordMetadata != null) {
            long endTime = System.currentTimeMillis() - startTime;
            logger.info("callback success, message(" + message + ") send to partition("
                    + recordMetadata.partition() + ")," + "offset(" + recordMetadata.offset() + ") in" + endTime);
        } else {
            e.printStackTrace();
        }
    }
}

三、消息消费者

package uyun.hornet.ticket.impl.service;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Created by xujia on 2018/9/11
 */
public class KafkaConsumer22 {

    private final Consumer<String, String> consumer;

    private KafkaConsumer22() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "xxxxxx");//服务器ip:端口号,集群用逗号分隔
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("itsm-test"));
    }

    void consume() {
        while(true){
            ConsumerRecords<String, String> records = consumer.poll(100);
            if (records.count() > 0) {
                for (ConsumerRecord<String, String> record : records) {
                    String message = record.value();
                    System.out.println("从kafka接收到的消息是:" + message);
                }
            }
        }

    }

    public static void main(String[] args) {
        new KafkaConsumer22().consume();
    }
}

该demo所传消息只是简单的字符串,若为对象可用ObjectMapper先转为json串发送,在消费端接收的时候转回来即可。

发表评论

您必须 登录 才能发表留言!