跳到主要内容

Go 语言中使用 Kafka 的安装和使用指南

环境准备

系统要求

在 Go 语言中使用 Kafka 需要以下环境:

  • 一台运行 Linux、macOS 或 Windows 操作系统的计算机。
  • 安装 Go 语言环境,版本为 1.16 或更高。可以通过 go version 命令验证。
  • 安装 Kafka,版本为 2.8 或更高。
  • 安装 Java 运行环境,版本为 Java 8 或更高。Kafka 依赖于 Java,可以通过 java -version 命令验证。

如果未安装 Java,请先安装 Java 8 或更高版本。

安装前的注意事项

确保已正确安装 Go 语言和 Java 运行环境,以避免后续安装和配置过程中出现问题。

安装 Kafka

下载 Kafka

访问 Kafka 的 官方网站,下载适用于操作系统的 Kafka 二进制文件。

解压 Kafka

下载完成后,解压缩文件。文件名可能因版本不同而有所变化:

tar -zxvf kafka_2.13-2.8.0.tgz

启动 Kafka 服务

进入 Kafka 目录后,首先启动 ZooKeeper 服务:

cd kafka_2.13-2.8.0
bin/zookeeper-server-start.sh config/zookeeper.properties

确保 ZooKeeper 服务已成功启动后,开启 Kafka 服务。建议在不同的终端窗口中分别运行 ZooKeeper 和 Kafka 服务:

bin/kafka-server-start.sh config/server.properties

使用 Go 语言连接 Kafka

安装 Kafka 的 Go 客户端

选择 Sarama 作为 Kafka 的 Go 客户端,使用以下命令进行安装:

go get github.com/Shopify/sarama

编写生产者代码

创建 producer.go 文件,编写以下代码:

package main

import (
"fmt"
"github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true

producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Printf("创建生产者失败: %v\n", err)
return
}
defer producer.Close()

message := &sarama.ProducerMessage{
Topic: "sumingcheng",
Value: sarama.StringEncoder("Hello Kafka from Go!"),
}

partition, offset, err := producer.SendMessage(message)
if err != nil {
fmt.Printf("发送消息失败: %v\n", err)
return
}

fmt.Printf("消息已存储到分区 %d,偏移量 %d\n", partition, offset)
}

在实际应用中,应替换 fmt.Printf 为适当的日志记录机制,以增强程序的健壮性。

编写消费者代码

创建 consumer.go 文件,编写以下代码:

package main

import (
"fmt"
"github.com/Shopify/sarama"
)

func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
fmt.Printf("创建消费者失败: %v\n", err)
return
}
defer consumer.Close()

partitionConsumer, err := consumer.ConsumePartition("sumingcheng", 0, sarama.OffsetNewest)
if err != nil {
fmt.Printf("订阅分区失败: %v\n", err)
return
}
defer partitionConsumer.Close()

for message := range partitionConsumer.Messages() {
fmt.Printf("收到消息:%s\n", string(message.Value))
}
}

此示例订阅了主题的分区 0。在实际应用中,可以根据需求订阅不同分区或使用消费者组以实现负载均衡。

运行代码

在终端中运行生产者:

go run producer.go

在另一个终端中运行消费者:

go run consumer.go

测试 Kafka

发送消息后,消费者终端应显示:

收到消息:Hello Kafka from Go!