Go 语言中使用 Kafka 的安装和使用指南
环境准备
系统要求
在 Go 语言中使用 Kafka 需要以下环境:
- 一台运行 Linux、macOS 或 Windows 操作系统的计算机。
- 安装 Go 语言环境,版本为 1.16 或更高。可以通过
go version
命令验证。 - 安装 Kafka,版本为 2.8 或更高。
- 安装 Java 运行环境,版本为 Java 8 或更高。Kafka 依赖于 Java,可以通过
java -version
命令验证。
如果未安装 Java,请先安装 Java 8 或更高版本。
安装前的注意事项
确保已正确安装 Go 语言和 Java 运行环境,以避免后续安装和配置过程中出现问题。
安装 Kafka
下载 Kafka
访问 Kafka 的 官方网站,下载适用于操作系统的 Kafka 二进制文件。
解压 Kafka
下载完成后,解压缩文件。文件名可能因版本不同而有所变化:
tar -zxvf kafka_2.13-2.8.0.tgz
启动 Kafka 服务
进入 Kafka 目录后,首先启动 ZooKeeper 服务:
cd kafka_2.13-2.8.0
bin/zookeeper-server-start.sh config/zookeeper.properties
确保 ZooKeeper 服务已成功启动后,开启 Kafka 服务。建议在不同的终端窗口中分别运行 ZooKeeper 和 Kafka 服务:
bin/kafka-server-start.sh config/server.properties
使用 Go 语言连接 Kafka
安装 Kafka 的 Go 客户端
选择 Sarama 作为 Kafka 的 Go 客户端,使用以下命令进行安装:
go get github.com/Shopify/sarama
编写生产者代码
创建 producer.go
文件,编写以下代码:
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Printf("创建生产者失败: %v\n", err)
return
}
defer producer.Close()
message := &sarama.ProducerMessage{
Topic: "sumingcheng",
Value: sarama.StringEncoder("Hello Kafka from Go!"),
}
partition, offset, err := producer.SendMessage(message)
if err != nil {
fmt.Printf("发送消息失败: %v\n", err)
return
}
fmt.Printf("消息已存储到分区 %d,偏移量 %d\n", partition, offset)
}
在实际应用中,应替换 fmt.Printf
为适当的日志记录机制,以增强程序的健壮性。
编写消费者代码
创建 consumer.go
文件,编写以下代码:
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
fmt.Printf("创建消费者失败: %v\n", err)
return
}
defer consumer.Close()
partitionConsumer, err := consumer.ConsumePartition("sumingcheng", 0, sarama.OffsetNewest)
if err != nil {
fmt.Printf("订阅分区失败: %v\n", err)
return
}
defer partitionConsumer.Close()
for message := range partitionConsumer.Messages() {
fmt.Printf("收到消息:%s\n", string(message.Value))
}
}
此示例订阅了主题的分区 0。在实际应用中,可以根据需求订阅不同分区或使用消费者组以实现负载均衡。
运行代码
在终端中运行生产者:
go run producer.go
在另一个终端中运行消费者:
go run consumer.go
测试 Kafka
发送消息后,消费者终端应显示:
收到消息:Hello Kafka from Go!