Spring Cloud_28_消息驱动/Kafka的使用

By | 2019年4月18日

版权声明:本文为博主原创文章,艾特米工作室,欢迎转载 https://blog.csdn.net/zhaozao5757/article/details/79712259

消息驱动/Kafka的使用

  • 与RabbitMQ一样,充当消息代理中间件的角色

1、下载Zookeeper/Kafka

  • ZooKeeper

    Kafka依赖于Zookeeper,Zookeeper是一个服务的管理框架,在启动Kafka(2.11)服务之前,需要先启动Zookeeper(3.4.8)

  • Kafka

2、启动Zookeeper

  • 进入%Zookeeper_Home%\conf
  • 复制zoo_sample.cfg,并更名为zoo.cfg
  • 进入%Zookeeper_Home%\bin,进入命令行窗口
  • 使用zkServer命令,启动zookeeper,默认端口2181

3、启动Kafka

  • 进入%kafka_Home%bin\windows,进入命令行窗口
  • 默认是使用%kafka_Home%\config中的server.properties,启动kafka
  • 使用 kafka-server-start ../../config/server.properties,启动kafka,默认9092端口

4、创建atm_kafka_client

4.1、引入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.9</version>
</dependency>

4.2、SendMessage

package com.atm.cloud;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * 向Kafka服务器发送消息
 */
public class SendMessage {

    public static void main(String[] args) throws Exception {
        // 配置信息
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        // String的序列化类
        // 设置数据key的序列化处理类
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        // 设置数据value的序列化处理类
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        // 创建生产者实例
        Producer<String, String> producer = new KafkaProducer<String, String>(
                props);
        // 创建一条新的记录,第一个参数为Topic名称
        // 会向topic发送userName-aitemi键值,所有的数据都是通过键值保存的
        ProducerRecord record = new ProducerRecord<String, String>("my-topic",
                "userName", "aitemi");
        // 发送记录
        producer.send(record);
        producer.close();
    }
}
  • 执行main
  • 进入%kafka_Home%bin\windows,进入命令行窗口,键如 kafka-topic –list –zookeeper localhost:2181
  • 查看新建的topic

  • kafka中的topic类似rabbitmq中的队列,我们刚刚向topic中发送了一个消息

4.3、ReadMessage

package com.atm.cloud;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * 消费者,订阅"my-topic",获取其中的信息
 */
public class ReadMessage {

    public static void main(String[] args) throws Exception {
        // 配置信息
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        // 必须指定消费者组
        props.put("group.id", "test");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(
                props);
        // 订阅 my-topic 的消息,可以订阅多个topic
        consumer.subscribe(Arrays.asList("my-topic"));
        // 到服务器中读取记录,会一直拉取
        while (true) {
            // 通过consumer的一个拉取方法
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("这是消费者A,key: " + record.key() + ", value: "
                        + record.value());
            }
        }
    }
}
  • 每发送一次消息,均会接收到

4.4、消费者组

  • 消费者会为自己添加一个消费者组,每一条发布到topic的记录都会被交付到消费者组
  • 如果多个消费者实例有相同消费者组,那么信息会分配其中一个消费实例上
  • 如果所有的消费者都有不同的消费者组,那么消息会被广播到全部的消费者进行处理
  • 通过这样的机制来实现负载均衡的功能
// 必须指定消费者组
props.put("group.id", "test");

发表评论