当前位置: 首页 > news >正文

Kafka调研笔记

安装部署

安装服务端

安装环境:笔记本电脑+Ubuntu系统

  1. 需要先安装JAVA
# 安装 OpenJDK
sudo apt update
sudo apt install -y openjdk-11-jdk# 验证安装
java -version
  1. 安装 Kafka
# 添加 Apache 仓库
wget -qO - https://packages.confluent.io/deb/7.6/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/7.6 stable main"
sudo add-apt-repository "deb https://packages.confluent.io/clients/deb $(lsb_release -cs) main"# 安装 Confluent Platform(包含 Kafka)
sudo apt update
sudo apt install -y confluent-community-2.13# 或只安装 Kafka
sudo apt install -y confluent-kafka-2.13

启动服务

# 启动Zookeeper(Kafka的依赖)
sudo systemctl start confluent-zookeeper# 启动Kafka
sudo systemctl start confluent-kafka# 设置开机自启
sudo systemctl enable confluent-zookeeper
sudo systemctl enable confluent-kafka

测试

测试1:创建主题

# 使用 kafka-topics 命令
kafka-topics --create \--topic test-topic \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1# 或如果命令不存在,用完整路径
/usr/bin/kafka-topics --create \--topic test-topic \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1

测试2:列出主题

kafka-topics --list --bootstrap-server localhost:9092

测试3:生产和消费消息

# 终端1:启动生产者
kafka-console-producer --topic test-topic --bootstrap-server localhost:9092
# 输入一些消息,按Ctrl+C退出# 终端2:启动消费者
kafka-console-consumer --topic test-topic \--from-beginning \--bootstrap-server localhost:9092

示例代码

安装依赖包

Install-Package Confluent.Kafka

生产者

// 1. 配置 Producer
using Confluent.Kafka;var config = new ProducerConfig
{// Kafka 服务器地址,多个用逗号分隔BootstrapServers = "192.168.126.194:9092",// 设置acks为all,确保数据可靠性Acks = Acks.All,// 如果没有指定Key,这里可以为空,或者设置分区器Partitioner = Confluent.Kafka.Partitioner.Random
};// 2. 创建 Producer 实例 (Key为Null, Value为string)
// 如果需要Key,可将 Null 改为 string (如 Producer<string, string>)
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{string topic = "my-topic";string message = "Hello Confluent Kafka from C#!";try{while (true){// 3. 异步发送消息var deliveryResult = producer.ProduceAsync(topic, new Message<Null, string>{Value = message}).GetAwaiter().GetResult();Console.WriteLine($"成功发送消息到: {deliveryResult.Topic} (分区: {deliveryResult.Partition}, 偏移量: {deliveryResult.Offset})");Thread.Sleep(1000);}}catch (ProduceException<Null, string> e){Console.WriteLine($"发送失败: {e.Error.Reason}");}
}

消费者

// 1. 配置 Consumer
using Confluent.Kafka;var config = new ConsumerConfig
{BootstrapServers = "192.168.126.194:9092",// 消费者组ID,非常重要!同组消费者会负载均衡消费消息GroupId = "test-consumer-group-1",// 自动提交偏移量的设置EnableAutoCommit = false, // 建议手动提交以防止数据丢失// 如果没有初始偏移量,从最早开始读AutoOffsetReset = AutoOffsetReset.Earliest
};// 2. 创建 Consumer 实例
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{string topic = "my-topic";// 3. 订阅主题 (支持正则表达式,如 "^my-.*")consumer.Subscribe(topic);CancellationTokenSource cts = new CancellationTokenSource();Console.CancelKeyPress += (_, e) => {e.Cancel = true; // 防止进程终止cts.Cancel();};try{Console.WriteLine("消费者启动,正在监听消息...");while (true){// 4. 拉取消息 (Consume 方法是阻塞的,直到有消息或超时)try{var consumeResult = consumer.Consume(cts.Token);Console.WriteLine($"收到消息: {consumeResult.Message.Value} [Topic: {consumeResult.Topic}, Partition: {consumeResult.Partition}, Offset: {consumeResult.Offset}]");// 5. 手动提交偏移量 (表示消息已处理完毕)// 这里可以基于业务逻辑处理,只有确认处理成功才Commitconsumer.Commit(consumeResult);}catch (ConsumeException e){Console.WriteLine($"消费出错: {e.Error.Reason}");}}}catch (OperationCanceledException){// Ctrl+C 被触发,优雅关闭consumer.Close();}
}
http://www.zskr.cn/news/158013.html

相关文章:

  • 线控转向系统联合仿真与变传动比模块研究
  • 2025年值得推荐的布袋除尘器供应商排行榜,专业厂家精选测评推荐 - 工业推荐榜
  • mitm中间人攻击 安卓证书(雷电模拟器)
  • 条件格式代码自定义
  • 企业怎么挑能对接多业务系统的绩效管理平台?避坑要点解析
  • 2025年循环恒温水槽品牌制造商排行榜:看哪家质量好? - myqiye
  • 多维度智能分析与评估:业绩管理软件的核心应用技巧
  • 2025年团餐服务公司口碑排名:恩诺膳食的口味如何、团队实力怎样? - mypinpai
  • Open-AutoGLM与macOS深度适配方案(仅限技术先锋的内部实践曝光)
  • 2025年上海GEO精准优化选哪家?GEO优化排名TOP5推荐, - 工业品牌热点
  • 2025数字展厅建设TOP5权威推荐:盛世笔特,技术与创意融合赋能文化传播新场景 - 工业推荐榜
  • 2025有名的数字展厅策划设计施工品牌企业TOP5推荐 - 工业品牌热点
  • PaddleOCR太强了!基于PaddlePaddle镜像的高精度文本识别方案
  • 中文NLP处理神器:PaddlePaddle镜像全面支持BERT、ERNIE等模型
  • 益生菌十大品牌排行榜!益生菌哪个牌子抗幽好?榜首从菌株到活菌数,肠胃健康的安心之选 - 博客万
  • 2025自考必备10个降AI率工具,高效避坑指南
  • 2025年尘埃在线监测系统优质厂家推荐指南,在线式粒子计数器/尘埃粒子计数器在线监测系统/手持式尘埃粒子计数器尘埃在线监测系统厂家排名 - 品牌推荐师
  • 2025AI营销服务商TOP5权威推荐:中鼓数据企业价值观如何、实力怎么样? - 工业品牌热点
  • 钢丝绳市场新宠,回购率高的厂商大起底!钢丝绳/抛缆绳/钢卷吊具/缆绳/钢丝绳索具,钢丝绳企业哪家权威 - 品牌推荐师
  • Open-AutoGLM手机运行指南(仅需4步,实现离线AI推理)
  • 2025年现浇混凝土工程质量排行,这些公司领跑行业,现浇楼板/钢筋混凝土现浇/现浇楼梯/现浇搭建/现浇钢筋混凝土多少钱一平选哪家 - 品牌推荐师
  • 【完结21章】AI大模型算法-从大模型原理剖析到训练(微调)落地实战
  • 北京陪诊机构哪家好?实地探访告诉你,北京守嘉陪诊值得选 - 品牌排行榜单
  • 小红书推广服务选哪家?专业团队与行业推荐全解析 - mypinpai
  • 2025年上海靠谱AI搜索推广公司排行榜,有实力的AI搜索推广企业推荐 - 工业品牌热点
  • 2025年双相不锈钢供应商年度排名:行业靠谱服务商推荐有哪些? - 工业品牌热点
  • 手把手教你部署清华智谱 Open-AutoGLM(附完整代码与避坑指南)
  • 清华智谱 Open-AutoGLM 核心技术揭秘(AutoGLM背后的黑科技)
  • 学术会议合集
  • 【AI开发避坑宝典】:Open-AutoGLM导入失败的7个真实案例与解决方案