这个世界既不是有钱人的世界,也不是有权人的世界,它是有心人的世界。
一些有用的命令
1.列出主题:kafka-topics.bat --list --zookeeper localhost:2181
2.描述主题:kafka-topics.bat --describe --zookeeper localhost:2181 --topic [Topic Name]
3.从头读取消息:kafka-console-consumer.bat --zookeeper localhost:2181 --topic [Topic Name] --from-beginning
4.删除主题:E:\WorkSoftWare\kafka2.11\bin\windows>kafka-run-class.bat kafka.admin.TopicCommand --delete --topic test2016 --zookeeper localhost:2181
实体类:
KafkaProducerMessage.cs代码:
[Table("KafkaProducerMessage")] public partial class KafkaProducerMessage { public int KafkaProducerMessageID { get; set; } [Required] [StringLength(1000)] public string Topic { get; set; } [Required] public string MessageContent { get; set; } public DateTime CreatedAt { get; set; } }
KafkaConsumerMessage.cs代码:
[Table("KafkaConsumerMessage")] public partial class KafkaConsumerMessage { public int KafkaConsumerMessageID { get; set; } [Required] [StringLength(1000)] public string Topic { get; set; } public int Partition { get; set; } public long Offset { get; set; } [Required] public string MessageContent { get; set; } public DateTime CreatedAt { get; set; } }
KafkaProducerMessageArchive.cs代码:
[Table("KafkaProducerMessageArchive")] public partial class KafkaProducerMessageArchive { public int KafkaProducerMessageArchiveID { get; set; } public int KafkaProducerMessageID { get; set; } [Required] [StringLength(1000)] public string Topic { get; set; } [Required] public string MessageContent { get; set; } public DateTime CreatedAt { get; set; } public DateTime ArchivedAt { get; set; } }
vwMaxOffsetByPartitionAndTopic.cs代码:
[Table("vwMaxOffsetByPartitionAndTopic")] public partial class vwMaxOffsetByPartitionAndTopic { [Key] [Column(Order = 0)] [StringLength(1000)] public string Topic { get; set; } [Key] [Column(Order = 1)] [DatabaseGenerated(DatabaseGeneratedOption.None)] public int Partition { get; set; } public long? MaxOffset { get; set; } }
KafkaConsumerRepository.cs代码:
public class KafkaConsumerRepository { private KafkaModel context; public KafkaConsumerRepository () { this.context = new KafkaModel(); } public ListGetKafkaConsumerMessages() { return context.KafkaConsumerMessage.ToList(); } public KafkaConsumerMessage GetKafkaConsumerMessageByID(int MessageID) { return context.KafkaConsumerMessage.Find(MessageID); } public List GetKafkaConsumerMessageByTopic(string TopicName) { return context.KafkaConsumerMessage .Where(a => a.Topic == TopicName) .ToList(); } public List GetOffsetPositionByTopic(string TopicName) { return context.vwMaxOffsetByPartitionAndTopic .Where(a => a.Topic == TopicName) .ToList(); } public void InsertKafkaConsumerMessage(KafkaConsumerMessage Message) { Console.WriteLine(String.Format("Insert {0}: {1}", Message.KafkaConsumerMessageID.ToString(), Message.MessageContent)); context.KafkaConsumerMessage.Add(Message); context.SaveChanges(); Console.WriteLine(String.Format("Saved {0}: {1}", Message.KafkaConsumerMessageID.ToString(), Message.MessageContent)); } public void Save() { context.SaveChanges(); } public void Dispose() { context.Dispose(); } }
KafkaProducerRepository.cs代码:
public class KafkaProducerRepository { private KafkaModel context; public KafkaProducerRepository() { this.context = new KafkaModel(); } public ListGetKafkaProducerMessages() { return context.KafkaProducerMessage.ToList(); } public List GetDistinctTopics() { return context.KafkaProducerMessage.Select(a => a.Topic) .Distinct() .ToList(); } public KafkaProducerMessage GetKafkaProducerMessageByMessageID(int MessageID) { return context.KafkaProducerMessage.Find(MessageID); } public List GetKafkaProducerMessageByTopic(string TopicName) { return context.KafkaProducerMessage .Where(a => a.Topic == TopicName) .OrderBy(a => a.KafkaProducerMessageID) .ToList (); } public void InsertKafkaProducerMessage(KafkaProducerMessage Message) { context.KafkaProducerMessage.Add(Message); context.SaveChanges(); } public void ArchiveKafkaProducerMessage(int MessageID) { KafkaProducerMessage m = context.KafkaProducerMessage.Find(MessageID); KafkaProducerMessageArchive archivedMessage = KafkaProducerMessageToKafkaProducerMessageArchive(m); using (var dbContextTransaction = context.Database.BeginTransaction()) { try { context.KafkaProducerMessageArchive.Add(archivedMessage); context.KafkaProducerMessage.Remove(m); dbContextTransaction.Commit(); context.SaveChanges(); } catch (Exception) { dbContextTransaction.Rollback(); context.SaveChanges(); } } } public void ArchiveKafkaProducerMessageList(List Messages) { foreach (KafkaProducerMessage Message in Messages) { ArchiveKafkaProducerMessage(Message.KafkaProducerMessageID); } } public static KafkaProducerMessageArchive KafkaProducerMessageToKafkaProducerMessageArchive(KafkaProducerMessage Message) { return new KafkaProducerMessageArchive { KafkaProducerMessageID = Message.KafkaProducerMessageID, MessageContent = Message.MessageContent, Topic = Message.Topic, CreatedAt = Message.CreatedAt, ArchivedAt = DateTime.UtcNow }; } public void Save() { context.SaveChanges(); } public void Dispose() { context.Dispose(); } }
数据库上下文KafkaModel.cs代码:
public partial class KafkaModel : DbContext { public KafkaModel() : base("name=KafkaModel") { } public virtual DbSetKafkaConsumerMessage { get; set; } public virtual DbSet KafkaProducerMessage { get; set; } public virtual DbSet KafkaProducerMessageArchive { get; set; } public virtual DbSet vwMaxOffsetByPartitionAndTopic { get; set; } protected override void OnModelCreating(DbModelBuilder modelBuilder) { modelBuilder.Entity () .Property(e => e.Topic) .IsUnicode(false); modelBuilder.Entity () .Property(e => e.Topic) .IsUnicode(false); modelBuilder.Entity () .Property(e => e.Topic) .IsUnicode(false); modelBuilder.Entity () .Property(e => e.Topic) .IsUnicode(false); } }
Program.cs代码:
class Program { private static string Topic; static void Main(string[] args) { string invalidArgErrorMessage = "有效的args是:produce或consume"; if (args.Length < 1) { throw (new Exception(invalidArgErrorMessage)); } string intent = args[1]; Topic = ConfigurationManager.AppSettings["Topic"]; if (String.Equals(intent, "consume", StringComparison.OrdinalIgnoreCase)) { Console.WriteLine("开始消费者服务"); Consume(); } else if (String.Equals(intent, "produce", StringComparison.OrdinalIgnoreCase)) { Console.WriteLine("开始生产者服务"); Produce(); } else { throw (new Exception(invalidArgErrorMessage)); } } private static BrokerRouter InitDefaultConfig() { ListZKURIList = new List (); foreach (string s in ConfigurationManager.AppSettings["BrokerList"].Split(',')) { ZKURIList.Add(new Uri(s)); } var Options = new KafkaOptions(ZKURIList.ToArray()); var Router = new BrokerRouter(Options); return Router; } private static void Consume() { KafkaConsumerRepository KafkaRepo = new KafkaConsumerRepository(); bool FromBeginning = Boolean.Parse(ConfigurationManager.AppSettings["FromBeginning"]); var Router = InitDefaultConfig(); var Consumer = new Consumer(new ConsumerOptions(Topic, Router)); //如果我们不想从头开始,使用最新偏移量。 if (!FromBeginning) { var MaxOffsetByPartition = KafkaRepo.GetOffsetPositionByTopic(Topic); //如果我们得到一个结果使用它,否则默认 if (MaxOffsetByPartition.Count != 0) { List offsets = new List (); foreach (var m in MaxOffsetByPartition) { OffsetPosition o = new OffsetPosition(m.Partition, (long)m.MaxOffset + 1); offsets.Add(o); } Consumer.SetOffsetPosition(offsets.ToArray()); } else { Consumer.SetOffsetPosition(new OffsetPosition()); } } //消耗返回一个阻塞IEnumerable(ie:从没有结束流) foreach (var message in Consumer.Consume()) { string MessageContent = Encoding.UTF8.GetString(message.Value); Console.WriteLine(String.Format("处理带有内容的消息: {0}", MessageContent)); KafkaRepo = new KafkaConsumerRepository(); KafkaConsumerMessage ConsumerMessage = new KafkaConsumerMessage() { Topic = Topic, Offset = (int)message.Meta.Offset, Partition = message.Meta.PartitionId, MessageContent = MessageContent, CreatedAt = DateTime.UtcNow }; KafkaRepo.InsertKafkaConsumerMessage(ConsumerMessage); KafkaRepo.Dispose(); } } private static void Produce() { KafkaProducerRepository KafkaRepo = new KafkaProducerRepository(); var Router = InitDefaultConfig(); var Client = new Producer(Router); List Messages = new List (); foreach (KafkaProducerMessage message in KafkaRepo.GetKafkaProducerMessageByTopic(Topic)) { Messages.Add(new Message(message.MessageContent)); KafkaRepo.ArchiveKafkaProducerMessage(message.KafkaProducerMessageID); } Client.SendMessageAsync(Topic, Messages).Wait(); KafkaRepo.Dispose(); } }
App.config内容:
sql脚本:
IF NOT EXISTS(SELECT 1 FROM sys.tables WHERE name = 'KafkaConsumerMessage') BEGIN CREATE TABLE [dbo].[KafkaConsumerMessage]( [KafkaConsumerMessageID] [int] IDENTITY(1,1) NOT NULL, [Topic] [varchar](1000) NOT NULL, [Partition] [int] NOT NULL, [Offset] [bigint] NOT NULL, [MessageContent] [nvarchar](max) NOT NULL, [CreatedAt] [datetime] NOT NULL, CONSTRAINT [PK_Kafka_Consumer_Message_KafkaConsumerMessageID] PRIMARY KEY CLUSTERED ([KafkaConsumerMessageID]) ) END GO IF NOT EXISTS(SELECT 1 FROM sys.tables WHERE name = 'KafkaProducerMessage') BEGIN CREATE TABLE [dbo].[KafkaProducerMessage]( [KafkaProducerMessageID] [int] IDENTITY(1,1) NOT NULL, [Topic] [varchar](1000) NOT NULL, [MessageContent] [nvarchar](max) NOT NULL, [CreatedAt] [datetime] NOT NULL, CONSTRAINT [PK_KafkaProducerMessage_KafkaProducerMessageID] PRIMARY KEY CLUSTERED ([KafkaProducerMessageID]) ) END GO IF NOT EXISTS(SELECT 1 FROM sys.tables WHERE name = 'KafkaProducerMessageArchive') BEGIN CREATE TABLE [dbo].[KafkaProducerMessageArchive]( [KafkaProducerMessageArchiveID] [int] IDENTITY(1,1) NOT NULL, [KafkaProducerMessageID] [int] NOT NULL, [Topic] [varchar](1000) NOT NULL, [MessageContent] [nvarchar](max) NOT NULL, [CreatedAt] [datetime] NOT NULL, [ArchivedAt] [datetime] NOT NULL, CONSTRAINT [PK_KafkaProducerMessageArchive_KafkaProducerMessageArchiveID] PRIMARY KEY CLUSTERED ([KafkaProducerMessageArchiveID]) ) END GO
引用.net kafka 组件:
启动zookeeper服务器,kafka服务器
打开一个DOS环境窗口
zkServer
- 1
打开第二个DOS窗口
kafka-server-start D:xxxxxxxxx\config\server.properties
- 1
结果如图:
手动添加生产者表三条记录: