C#使用kafka

2018-12-07 18:27 

下载nuget包
在这里插入图片描述

1.定义公共配置类

 public abstract class KafkaBase
    {
        /// <summary>
        /// 获取Kafka服务器地址
        /// </summary>
        /// <param name="brokerNameKey">配置文件中Broker服务器地址的key的名称</param>
        /// <returns>返回获取到的Kafka服务器的地址明细</returns>
        public string GetKafkaBroker(string brokerNameKey = "Broker")
        {
            string kafkaBroker = System.Configuration.ConfigurationManager.AppSettings[brokerNameKey];
            if (string.IsNullOrEmpty(kafkaBroker) || string.IsNullOrWhiteSpace(kafkaBroker) || kafkaBroker.Length <= 0)
            {
                throw new ArgumentNullException("Kafka消息服务器地址不能为空!");
            }
            return kafkaBroker;
        }

        /// <summary>
        /// 主题名称
        /// </summary>
        /// <param name="topicNameKey">配置文件中主题的key名称</param>
        /// <returns>返回获取到的主题的具体值</returns>
        public string GetTopicName(string topicNameKey = "Topic")
        {
            string topicName = System.Configuration.ConfigurationManager.AppSettings[topicNameKey]; 

            if (string.IsNullOrEmpty(topicName) || string.IsNullOrWhiteSpace(topicName) || topicName.Length <= 0)
            {
                throw new ArgumentNullException("消息所属的主题不能为空!");
            }
            return topicName;
        }
        /// <summary>
        /// 组
        /// </summary>
        /// <returns></returns>
        public string GetGroupID(string groupIDKey= "GroupID")
        {
            string groupID = System.Configuration.ConfigurationManager.AppSettings[groupIDKey]; 
            if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
            {
                throw new ArgumentNullException("消息所属的组不能为空!");
            }
            return groupID;
        }
        /// <summary>
        /// 分区
        /// </summary>
        /// <returns></returns>
        public List<int> GetPartition(string partitionsKey= "Partitions")
        {

            List<int> partitions = new List<int>();
            System.Configuration.ConfigurationManager.AppSettings[partitionsKey].Split(',').ToList().ForEach(x => 
            {
                partitions.Add(Convert.ToInt32(x));
            });
            return partitions;
        }
        /// <summary>
        /// 一次消费消息数量
        /// </summary>
        /// <returns></returns>
        public int GetConsumerCount(string consumerCountKey= "ConsumerCount")
        {
            int count = Convert.ToInt32(System.Configuration.ConfigurationManager.AppSettings[consumerCountKey]);
            return count;
        }

        /// <summary>
        ///  写日志
        /// </summary>
        /// <param name="type">consumer,producer</param>
        /// <param name="info"></param>
        /// <param name="args"></param>
        public static void WriteLog(string type,string info, params object[] args)
        {
            try
            {
                string filelog = string.Format(@"{0}\{1}\", AppDomain.CurrentDomain.BaseDirectory, "kafkaLog");
                if (!Directory.Exists(filelog))
                {
                    Directory.CreateDirectory(filelog);
                }
                filelog = string.Format(@"{0}\{1}\{2}{3}", AppDomain.CurrentDomain.BaseDirectory, "kafkaLog", type + "_" + DateTime.Now.ToString("yyyy-MM-dd"), ".log");
                using (StreamWriter sw = File.AppendText(filelog))
                {
                    sw.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss 信息:\r\n"));
                    sw.WriteLine(string.Format(info, args));
                    sw.WriteLine("\r\n");
                    sw.Flush();
                    sw.Close();
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
    }

2.生产

 /// <summary>
    /// Kafka消息生产者
    /// </summary>
    public sealed class KafkaProducer : KafkaBase
    {
        
        private static readonly object Locker = new object();
        private static Producer<string, string> _producer;
        private static string _topic;
        private static List<int> _partition;
        /// <summary>
        /// 单例生产
        /// </summary>
        public KafkaProducer()
        {
            _topic = GetTopicName();      
            _partition = GetPartition();
            if (_producer == null)
            {
                lock (Locker)
                {
                    if (_producer == null)
                    {
                        
                        var config = new Dictionary<string, object>
                        {
                            { "bootstrap.servers", GetKafkaBroker() }         
                        };
                        _producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8));
                    }
                }
            }
        }
               
        /// <summary>
        /// 生产消息并发送消息
        /// </summary>
        /// <param name="key">key</param>
        /// <param name="message">需要传送的消息</param>
        public static void Produce(string key, string message)
        {        
            bool result = false;
            new KafkaProducer();
            int partition = _partition[new Random().Next(_partition.Count)];   
            if (string.IsNullOrEmpty(message) || string.IsNullOrWhiteSpace(message) || message.Length <= 0)
            {
                throw new ArgumentNullException("消息内容不能为空!");
            }  
            var deliveryReport = _producer.ProduceAsync(_topic, key, message, partition);
            deliveryReport.ContinueWith(task =>
            {
                if (task.Result.Error.Code == ErrorCode.NoError)
                {
                    result = true;
                }
                //Console.WriteLine("Producer:" + _producer.Name + "\r\nTopic:" + _topic + "\r\nPartition:" + task.Result.Partition + "\r\nOffset:" + task.Result.Offset + "\r\nMessage:" + task.Result.Value + "\r\nResult:" + result);
                WriteLog("producer", $"Topic:{ _topic } Partition:{ task.Result.Partition } Offset:{ task.Result.Offset } Message:{ task.Result.Value } result:{result}");
            });    
            _producer.Flush(TimeSpan.FromSeconds(10));  
            //return result;
        }
    }

3.消费

 /// <summary>
    /// Kafka消息消费者
    /// </summary>
    public sealed class KafkaConsumer : KafkaBase
    {

        #region 构造函数

        /// <summary>
        /// 构造函数,初始化IsCancelled属性
        /// </summary>
        public KafkaConsumer()
        {
            IsCancelled = false;
        }

        #endregion

        #region 属性

        /// <summary>
        /// 是否应该取消继续消费Kafka的消息,默认值是false,继续消费消息
        /// </summary>
        public bool IsCancelled { get; set; }

        #endregion

        #region 同步版本

        /// <summary>
        /// 指定的组别的消费者开始消费指定主题的消息
        /// </summary>
        /// <param name="broker">Kafka消息服务器的地址</param>
        /// <param name="topic">Kafka消息所属的主题</param>
        /// <param name="groupID">Kafka消费者所属的组别</param>
        /// <param name="action">可以对已经消费的消息进行相关处理</param>
        public void Consume(string broker, string topic, string groupID, Action<ConsumerResult> action = null)
        {
            if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
            {
                throw new ArgumentNullException("Kafka消息服务器的地址不能为空!");
            }

            if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
            {
                throw new ArgumentNullException("消息所属的主题不能为空!");
            }

            if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
            {
                throw new ArgumentNullException("用户分组ID不能为空!");
            }

            var config = new Dictionary<string, object>
                {
                    { "bootstrap.servers", broker },
                    { "group.id", groupID },
                    { "enable.auto.commit", true },  // this is the default
                    { "auto.commit.interval.ms", 5000 },
                    { "statistics.interval.ms", 60000 },
                    { "session.timeout.ms", 6000 },
                    { "auto.offset.reset", "smallest" }
                };


            using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
            {
                // Note: All event handlers are called on the main thread.
                //consumer.OnMessage += (_, message) => Console.WriteLine("Topic:" + message.Topic + " Partition:" + message.Partition + " Offset:" + message.Offset + " " + message.Value);
                //consumer.OnMessage += (_, message) => Console.WriteLine("Offset:【" + message.Offset + "】Message:【" + message.Value + "】");
                if (action != null)
                {
                    consumer.OnMessage += (_, message) => {
                        ConsumerResult messageResult = new ConsumerResult();
                        messageResult.Broker = broker;
                        messageResult.Topic = message.Topic;
                        messageResult.Partition = message.Partition;
                        messageResult.Offset = message.Offset.Value;
                        messageResult.Message = message.Value;

                        //执行外界自定义的方法
                        action(messageResult);
                    };
                }

                consumer.OnPartitionEOF += (_, end) => Console.WriteLine("Reached end of topic " + end.Topic + " partition " + end.Partition + ", next message will be at offset " + end.Offset);

                consumer.OnError += (_, error) => Console.WriteLine("Error:" + error);

                //引发反序列化错误或消费消息出现错误!= NoError。
                consumer.OnConsumeError += (_, message) => Console.WriteLine("Error consuming from topic/partition/offset " + message.Topic + "/" + message.Partition + "/" + message.Offset + ": " + message.Error);

                consumer.OnOffsetsCommitted += (_, commit) => Console.WriteLine(commit.Error ? "Failed to commit offsets:" + commit.Error : "Successfully committed offsets:" + commit.Offsets);

                // 当消费者被分配一组新的分区时引发。
                consumer.OnPartitionsAssigned += (_, partitions) =>
                {
                    Console.WriteLine("Assigned Partitions:" + partitions + ", Member ID:" + consumer.MemberId);
                    //如果您未向OnPartitionsAssigned事件添加处理程序,则会自动执行以下.Assign调用。 如果你为它添加了事件处理程序,你必须明确地调用.Assign以便消费者开始消费消息。
                    consumer.Assign(partitions);
                };

                // Raised when the consumer's current assignment set has been revoked.
                //当消费者的当前任务集已被撤销时引发。
                consumer.OnPartitionsRevoked += (_, partitions) =>
                {
                    Console.WriteLine("Revoked Partitions:" + partitions);
                    // If you don't add a handler to the OnPartitionsRevoked event,the below .Unassign call happens automatically. If you do, you must call .Unassign explicitly in order for the consumer to stop consuming messages from it's previously assigned partitions.
                    //如果您未向OnPartitionsRevoked事件添加处理程序,则下面的.Unassign调用会自动发生。 如果你为它增加了事件处理程序,你必须明确地调用.Usessign以便消费者停止从它先前分配的分区中消费消息。
                    consumer.Unassign();
                };

                //consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: " + json);

                consumer.Subscribe(topic);

                //Console.WriteLine("Subscribed to:" + consumer.Subscription);

                while (!IsCancelled)
                {
                    consumer.Poll(TimeSpan.FromMilliseconds(100));
                }
            }
        }
        /// <summary>
        /// 消费
        /// </summary>
        /// <param name="func">可以消息进行相关处理</param>
        /// <param name="partition">消费的分区及偏移</param>
        public void Consume(Func<List<ConsumerResult>, bool> func = null, Dictionary<int, int> partition = null)
        {
            KafkaManuallyCommittedOffsets(new ConsumerSetting()
            {
                Broker = GetKafkaBroker(),
                Topic = GetTopicName(),
                GroupID = GetGroupID(),
                Func = func,
                Partition = partition
            });
        }
        #endregion

        #region 异步版本

        /// <summary>
        /// 指定的组别的消费者开始消费指定主题的消息
        /// </summary>
        /// <param name="broker">Kafka消息服务器的地址</param>
        /// <param name="topic">Kafka消息所属的主题</param>
        /// <param name="groupID">Kafka消费者所属的组别</param>
        /// <param name="func">可以对已经消费的消息进行相关处理</param>
        public void ConsumeAsync(string broker, string topic, string groupID, Func<List<ConsumerResult>, bool> func = null)
        {
            if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
            {
                throw new ArgumentNullException("Kafka消息服务器的地址不能为空!");
            }

            if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
            {
                throw new ArgumentNullException("消息所属的主题不能为空!");
            }

            if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
            {
                throw new ArgumentNullException("用户分组ID不能为空!");
            }

            ThreadPool.QueueUserWorkItem(KafkaAutoCommittedOffsets, new ConsumerSetting() { Broker = broker, Topic = topic, GroupID = groupID, Func = func });
        }

        #endregion

        #region 两种提交Offsets的版本

        /// <summary>
        /// Kafka消息队列服务器自动提交offset
        /// </summary>
        /// <param name="state">消息消费者信息</param>
        private void KafkaAutoCommittedOffsets(object state)
        {
            ConsumerSetting setting = state as ConsumerSetting;

            var config = new Dictionary<string, object>
                {
                    { "bootstrap.servers", setting.Broker },
                    { "group.id", setting.GroupID },
                    { "enable.auto.commit", true },  // this is the default
                    { "auto.commit.interval.ms", 5000 },
                    { "statistics.interval.ms", 60000 },
                    { "session.timeout.ms", 6000 },
                    { "auto.offset.reset", "smallest" }
                };

            using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
            {
                if (setting.Func != null)
                {
                    consumer.OnMessage += (_, message) =>
                    {
                        ConsumerResult messageResult = new ConsumerResult();
                        messageResult.Broker = setting.Broker;
                        messageResult.Topic = message.Topic;
                        messageResult.Partition = message.Partition;
                        messageResult.Offset = message.Offset.Value;
                        messageResult.Message = message.Value;

                        //执行外界自定义的方法
                        setting.Func(new List<ConsumerResult>() { messageResult });
                    };
                }

                //consumer.OnStatistics += (_, json)=> Console.WriteLine("Statistics: {json}");

                //可以写日志
                //consumer.OnError += (_, error)=> Console.WriteLine("Error:"+error);

                //可以写日志
                //consumer.OnConsumeError += (_, msg) => Console.WriteLine("Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");

                consumer.Subscribe(setting.Topic);

                while (!IsCancelled)
                {
                    consumer.Poll(TimeSpan.FromMilliseconds(100));
                }
            }
        }

        /// <summary>
        /// Kafka消息队列服务器手动提交offset
        /// </summary>
        /// <param name="state">消息消费者信息</param>
        private void KafkaManuallyCommittedOffsets(object state)
        {
            ConsumerSetting setting = state as ConsumerSetting;

            var config = new Dictionary<string, object>
            {
                { "bootstrap.servers", setting.Broker },
                { "group.id", setting.GroupID },
                { "enable.auto.commit", false },
                { "auto.commit.interval.ms", 5000 },
                { "statistics.interval.ms", 60000 },
                { "session.timeout.ms", 6000 },
                { "auto.offset.reset", "smallest" }
            };

            using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
            {
                //订阅
                if (setting.Partition == null)
                {
                    consumer.Subscribe(setting.Topic);
                }
                else
                {
                    List<TopicPartitionOffset> topics = new List<TopicPartitionOffset>();
                    foreach (KeyValuePair<int, int> item in setting.Partition)
                    {
                        topics.Add(new TopicPartitionOffset(setting.Topic, item.Key, item.Value)); //topics.Add(new TopicPartitionOffset(setting.Topic, 0, Offset.Beginning));
                    }
                    consumer.Assign(topics);
                }
                //可以写日志
                consumer.OnError += (_, error) => WriteLog("consumer", "Error:" + error);
                consumer.OnConsumeError += (_, error) => WriteLog("consumer", "Consume error:" + error);

                List<ConsumerResult> consumerResults = new List<ConsumerResult>();
                Message<Ignore, string> message = null;
                int msgCount = GetConsumerCount();
                while (!IsCancelled)
                {
                    if (!consumer.Consume(out message, TimeSpan.FromMilliseconds(100)))
                    {
                        continue;
                    }
                    //Console.WriteLine($"主题: {message.Topic} 分区: {message.Partition} 分区偏移量: {message.Offset} Value:{message.Value} count {msgCount}");
                    WriteLog("consumer", $"主题: {message.Topic} 分区: {message.Partition} 分区偏移量: {message.Offset} Value:{message.Value} count {msgCount}");
                    consumerResults.Add( new ConsumerResult
                    {
                        Broker = setting.Broker,
                        Topic = message.Topic,
                        Partition = message.Partition,
                        Offset = message.Offset.Value,
                        Message = message.Value
                    });
                    msgCount--;
                    if (msgCount == 0)
                    {
                        bool? result = setting.Func?.Invoke(consumerResults);
                        if (result ?? false)
                        {
                            var committedOffsets = consumer.CommitAsync(message).Result;
                            Console.WriteLine($"committed offsets:{committedOffsets}");
                            WriteLog("consumer", $"committed offsets:{committedOffsets}");
                            msgCount = GetConsumerCount();
                        }
                        else
                        {
                            WriteLog("consumer", "调用消费返回False");
                        }
                    }
                }

            }
        }

        #endregion

    }

4.对象

 /// <summary>
    /// 已经消费的消息的详情信息
    /// </summary>
    public sealed class ConsumerResult
    {
        /// <summary>
        /// Kafka消息服务器的地址
        /// </summary>
        public string Broker { get; set; }

        /// <summary>
        /// Kafka消息所属的主题
        /// </summary>
        public string Topic { get; set; }

        /// <summary>
        /// Kafka消息消费者分组主键
        /// </summary>
        public string GroupID { get; set; }

        /// <summary>
        /// 我们需要处理的消息具体的内容
        /// </summary>
        public string Message { get; set; }

        /// <summary>
        /// Kafka数据读取的当前位置
        /// </summary>
        public long Offset { get; set; }

        /// <summary>
        /// 消息所在的物理分区
        /// </summary>
        public int Partition { get; set; }
    }
 /// <summary>
    /// Kafka消息消费者设置对象,提供Kafka消费消息的参数对象(Consumer.Consum)
    /// </summary>
    public sealed class ConsumerSetting
    {
        /// <summary>
        /// Kafka消息服务器的地址
        /// </summary>
        public string Broker { get; set; }

        /// <summary>
        /// Kafka消息所属的主题
        /// </summary>
        public string Topic { get; set; }

        /// <summary>
        /// Kafka消息消费者分组主键
        /// </summary>
        public string GroupID { get; set; }

        /// <summary>
        /// 分区
        /// </summary>
        public Dictionary<int, int> Partition { get; set; } = null;

        /// <summary>
        /// 消息的详情信息
        /// </summary>
        public List<ConsumerResult> ConsumerResults { get; set; } = null;
        /// <summary>
        /// 消费后提交偏移
        /// </summary>
        public Func<List<ConsumerResult>, bool> Func { get; set; } = null;

    }

发表评论

您必须 登录 才能发表留言!