博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka.net使用编程入门(三)
阅读量:6642 次
发布时间:2019-06-25

本文共 10765 字,大约阅读时间需要 35 分钟。

这个世界既不是有钱人的世界,也不是有权人的世界,它是有心人的世界。


一些有用的命令

1.列出主题:kafka-topics.bat --list --zookeeper localhost:2181

2.描述主题:kafka-topics.bat --describe --zookeeper localhost:2181 --topic [Topic Name]

3.从头读取消息:kafka-console-consumer.bat --zookeeper localhost:2181 --topic [Topic Name] --from-beginning

4.删除主题:E:\WorkSoftWare\kafka2.11\bin\windows>kafka-run-class.bat kafka.admin.TopicCommand --delete --topic test2016 --zookeeper localhost:2181

实体类:

KafkaProducerMessage.cs代码:

[Table("KafkaProducerMessage")]    public partial class KafkaProducerMessage    {        public int KafkaProducerMessageID { get; set; } [Required] [StringLength(1000)] public string Topic { get; set; } [Required] public string MessageContent { get; set; } public DateTime CreatedAt { get; set; } }

KafkaConsumerMessage.cs代码:

[Table("KafkaConsumerMessage")]    public partial class KafkaConsumerMessage    {        public int KafkaConsumerMessageID { get; set; } [Required] [StringLength(1000)] public string Topic { get; set; } public int Partition { get; set; } public long Offset { get; set; } [Required] public string MessageContent { get; set; } public DateTime CreatedAt { get; set; } }

KafkaProducerMessageArchive.cs代码:

[Table("KafkaProducerMessageArchive")]    public partial class KafkaProducerMessageArchive    {        public int KafkaProducerMessageArchiveID { get; set; } public int KafkaProducerMessageID { get; set; } [Required] [StringLength(1000)] public string Topic { get; set; } [Required] public string MessageContent { get; set; } public DateTime CreatedAt { get; set; } public DateTime ArchivedAt { get; set; } }

vwMaxOffsetByPartitionAndTopic.cs代码:

[Table("vwMaxOffsetByPartitionAndTopic")]    public partial class vwMaxOffsetByPartitionAndTopic    {        [Key]        [Column(Order = 0)] [StringLength(1000)] public string Topic { get; set; } [Key] [Column(Order = 1)] [DatabaseGenerated(DatabaseGeneratedOption.None)] public int Partition { get; set; } public long? MaxOffset { get; set; } }

KafkaConsumerRepository.cs代码:

public class KafkaConsumerRepository    {        private KafkaModel context;        public KafkaConsumerRepository () { this.context = new KafkaModel(); } public List
GetKafkaConsumerMessages() { return context.KafkaConsumerMessage.ToList(); } public KafkaConsumerMessage GetKafkaConsumerMessageByID(int MessageID) { return context.KafkaConsumerMessage.Find(MessageID); } public List
GetKafkaConsumerMessageByTopic(string TopicName) { return context.KafkaConsumerMessage .Where(a => a.Topic == TopicName) .ToList(); } public List
GetOffsetPositionByTopic(string TopicName) { return context.vwMaxOffsetByPartitionAndTopic .Where(a => a.Topic == TopicName) .ToList(); } public void InsertKafkaConsumerMessage(KafkaConsumerMessage Message) { Console.WriteLine(String.Format("Insert {0}: {1}", Message.KafkaConsumerMessageID.ToString(), Message.MessageContent)); context.KafkaConsumerMessage.Add(Message); context.SaveChanges(); Console.WriteLine(String.Format("Saved {0}: {1}", Message.KafkaConsumerMessageID.ToString(), Message.MessageContent)); } public void Save() { context.SaveChanges(); } public void Dispose() { context.Dispose(); } }

KafkaProducerRepository.cs代码:

public class KafkaProducerRepository    {        private KafkaModel context;        public KafkaProducerRepository() { this.context = new KafkaModel(); } public List
GetKafkaProducerMessages() { return context.KafkaProducerMessage.ToList(); } public List
GetDistinctTopics() { return context.KafkaProducerMessage.Select(a => a.Topic) .Distinct() .ToList(); } public KafkaProducerMessage GetKafkaProducerMessageByMessageID(int MessageID) { return context.KafkaProducerMessage.Find(MessageID); } public List
GetKafkaProducerMessageByTopic(string TopicName) { return context.KafkaProducerMessage .Where(a => a.Topic == TopicName) .OrderBy(a => a.KafkaProducerMessageID) .ToList
(); } public void InsertKafkaProducerMessage(KafkaProducerMessage Message) { context.KafkaProducerMessage.Add(Message); context.SaveChanges(); } public void ArchiveKafkaProducerMessage(int MessageID) { KafkaProducerMessage m = context.KafkaProducerMessage.Find(MessageID); KafkaProducerMessageArchive archivedMessage = KafkaProducerMessageToKafkaProducerMessageArchive(m); using (var dbContextTransaction = context.Database.BeginTransaction()) { try { context.KafkaProducerMessageArchive.Add(archivedMessage); context.KafkaProducerMessage.Remove(m); dbContextTransaction.Commit(); context.SaveChanges(); } catch (Exception) { dbContextTransaction.Rollback(); context.SaveChanges(); } } } public void ArchiveKafkaProducerMessageList(List
Messages) { foreach (KafkaProducerMessage Message in Messages) { ArchiveKafkaProducerMessage(Message.KafkaProducerMessageID); } } public static KafkaProducerMessageArchive KafkaProducerMessageToKafkaProducerMessageArchive(KafkaProducerMessage Message) { return new KafkaProducerMessageArchive { KafkaProducerMessageID = Message.KafkaProducerMessageID, MessageContent = Message.MessageContent, Topic = Message.Topic, CreatedAt = Message.CreatedAt, ArchivedAt = DateTime.UtcNow }; } public void Save() { context.SaveChanges(); } public void Dispose() { context.Dispose(); } }

数据库上下文KafkaModel.cs代码:

public partial class KafkaModel : DbContext { public KafkaModel() : base("name=KafkaModel") { } public virtual DbSet
KafkaConsumerMessage { get; set; } public virtual DbSet
KafkaProducerMessage { get; set; } public virtual DbSet
KafkaProducerMessageArchive { get; set; } public virtual DbSet
vwMaxOffsetByPartitionAndTopic { get; set; } protected override void OnModelCreating(DbModelBuilder modelBuilder) { modelBuilder.Entity
() .Property(e => e.Topic) .IsUnicode(false); modelBuilder.Entity
() .Property(e => e.Topic) .IsUnicode(false); modelBuilder.Entity
() .Property(e => e.Topic) .IsUnicode(false); modelBuilder.Entity
() .Property(e => e.Topic) .IsUnicode(false); } }

Program.cs代码:

class Program    {        private static string Topic;        static void Main(string[] args) { string invalidArgErrorMessage = "有效的args是:produce或consume"; if (args.Length < 1) { throw (new Exception(invalidArgErrorMessage)); } string intent = args[1]; Topic = ConfigurationManager.AppSettings["Topic"]; if (String.Equals(intent, "consume", StringComparison.OrdinalIgnoreCase)) { Console.WriteLine("开始消费者服务"); Consume(); } else if (String.Equals(intent, "produce", StringComparison.OrdinalIgnoreCase)) { Console.WriteLine("开始生产者服务"); Produce(); } else { throw (new Exception(invalidArgErrorMessage)); } } private static BrokerRouter InitDefaultConfig() { List
ZKURIList = new List
(); foreach (string s in ConfigurationManager.AppSettings["BrokerList"].Split(',')) { ZKURIList.Add(new Uri(s)); } var Options = new KafkaOptions(ZKURIList.ToArray()); var Router = new BrokerRouter(Options); return Router; } private static void Consume() { KafkaConsumerRepository KafkaRepo = new KafkaConsumerRepository(); bool FromBeginning = Boolean.Parse(ConfigurationManager.AppSettings["FromBeginning"]); var Router = InitDefaultConfig(); var Consumer = new Consumer(new ConsumerOptions(Topic, Router)); //如果我们不想从头开始,使用最新偏移量。 if (!FromBeginning) { var MaxOffsetByPartition = KafkaRepo.GetOffsetPositionByTopic(Topic); //如果我们得到一个结果使用它,否则默认 if (MaxOffsetByPartition.Count != 0) { List
offsets = new List
(); foreach (var m in MaxOffsetByPartition) { OffsetPosition o = new OffsetPosition(m.Partition, (long)m.MaxOffset + 1); offsets.Add(o); } Consumer.SetOffsetPosition(offsets.ToArray()); } else { Consumer.SetOffsetPosition(new OffsetPosition()); } } //消耗返回一个阻塞IEnumerable(ie:从没有结束流) foreach (var message in Consumer.Consume()) { string MessageContent = Encoding.UTF8.GetString(message.Value); Console.WriteLine(String.Format("处理带有内容的消息: {0}", MessageContent)); KafkaRepo = new KafkaConsumerRepository(); KafkaConsumerMessage ConsumerMessage = new KafkaConsumerMessage() { Topic = Topic, Offset = (int)message.Meta.Offset, Partition = message.Meta.PartitionId, MessageContent = MessageContent, CreatedAt = DateTime.UtcNow }; KafkaRepo.InsertKafkaConsumerMessage(ConsumerMessage); KafkaRepo.Dispose(); } } private static void Produce() { KafkaProducerRepository KafkaRepo = new KafkaProducerRepository(); var Router = InitDefaultConfig(); var Client = new Producer(Router); List
Messages = new List
(); foreach (KafkaProducerMessage message in KafkaRepo.GetKafkaProducerMessageByTopic(Topic)) { Messages.Add(new Message(message.MessageContent)); KafkaRepo.ArchiveKafkaProducerMessage(message.KafkaProducerMessageID); } Client.SendMessageAsync(Topic, Messages).Wait(); KafkaRepo.Dispose(); } }

 

App.config内容:

sql脚本:

IF NOT EXISTS(SELECT 1 FROM sys.tables WHERE name = 'KafkaConsumerMessage') BEGIN CREATE TABLE [dbo].[KafkaConsumerMessage]( [KafkaConsumerMessageID] [int] IDENTITY(1,1) NOT NULL, [Topic] [varchar](1000) NOT NULL, [Partition] [int] NOT NULL, [Offset] [bigint] NOT NULL, [MessageContent] [nvarchar](max) NOT NULL, [CreatedAt] [datetime] NOT NULL, CONSTRAINT [PK_Kafka_Consumer_Message_KafkaConsumerMessageID] PRIMARY KEY CLUSTERED ([KafkaConsumerMessageID]) ) END GO IF NOT EXISTS(SELECT 1 FROM sys.tables WHERE name = 'KafkaProducerMessage') BEGIN CREATE TABLE [dbo].[KafkaProducerMessage]( [KafkaProducerMessageID] [int] IDENTITY(1,1) NOT NULL, [Topic] [varchar](1000) NOT NULL, [MessageContent] [nvarchar](max) NOT NULL, [CreatedAt] [datetime] NOT NULL, CONSTRAINT [PK_KafkaProducerMessage_KafkaProducerMessageID] PRIMARY KEY CLUSTERED ([KafkaProducerMessageID]) ) END GO IF NOT EXISTS(SELECT 1 FROM sys.tables WHERE name = 'KafkaProducerMessageArchive') BEGIN CREATE TABLE [dbo].[KafkaProducerMessageArchive]( [KafkaProducerMessageArchiveID] [int] IDENTITY(1,1) NOT NULL, [KafkaProducerMessageID] [int] NOT NULL, [Topic] [varchar](1000) NOT NULL, [MessageContent] [nvarchar](max) NOT NULL, [CreatedAt] [datetime] NOT NULL, [ArchivedAt] [datetime] NOT NULL, CONSTRAINT [PK_KafkaProducerMessageArchive_KafkaProducerMessageArchiveID] PRIMARY KEY CLUSTERED ([KafkaProducerMessageArchiveID]) ) END GO

引用.net kafka 组件:

这里写图片描述


启动zookeeper服务器,kafka服务器

打开一个DOS环境窗口

zkServer
  • 1

打开第二个DOS窗口

kafka-server-start D:xxxxxxxxx\config\server.properties
  • 1

结果如图:

手动添加生产者表三条记录:

这里写图片描述 

这里写图片描述


这里写图片描述


消费者表

这里写图片描述


这里写图片描述

你可能感兴趣的文章
Linux 学习
查看>>
高一函数中易混的问题
查看>>
程序员的自我修养(2)——计算机网络(转) good
查看>>
HYSBZ 1588 营业额统计
查看>>
关于文字过长时进行省略问题
查看>>
单例模式的四种方式
查看>>
Redis客户端ServiceStack.Redis的简单使用
查看>>
PCA主成份分析学习记要
查看>>
[链接地址] Kafka设计解析
查看>>
Spring Cloud 分布式链路跟踪 Sleuth + Zipkin + Elasticsearch【Finchley 版】
查看>>
django视图001
查看>>
第一章Accp 8.0
查看>>
基于BootStrap的initupload()实现Excel上传和获取excel中的数据
查看>>
新增 修改,对xx名字或者其他属性做校验判断是否存在
查看>>
EF6 在原有数据库中使用 CodeFirst 总复习(一、搭建基础环境)
查看>>
MySQL性能优化小结
查看>>
Spring+SpringMVC+MyBatis)
查看>>
BZOJ-2190: [SDOI2008]仪仗队 (欧拉函数)
查看>>
浅谈 .NET 中的对象引用、非托管指针和托管指针
查看>>
[ASP.NET MVC 小牛之路]15 - Model Binding
查看>>