• 设为首页
  • 收藏本站
  • 积分充值
  • VIP赞助
  • 手机版
  • 微博
  • 微信
    微信公众号 添加方式:
    1:搜索微信号(888888
    2:扫描左侧二维码
  • 快捷导航
    福建二哥 门户 查看主题

    一文详解Golang连接kafka的基本操作

    发布者: 怀卉1097 | 发布时间: 2025-8-14 09:23| 查看数: 56| 评论数: 0|帖子模式

    1.kafka的学习


    1.1 启动kafka与zookeeper

    kafka与zookeeper是相关联的
    1. bin/zookeeper-server-start.sh config/zookeeper.properties
    复制代码
    1. bin/kafka-server-start.sh config/server.properties
    复制代码
    1.2 创建topic
    1. bin/kafka-topics.sh --create --topic hello --bootstrap-server 主机名:9092
    复制代码
    1.3 生产消息
    1. bin/kafka-console-producer.sh --broker-list 主机名:9092 --topic hello
    复制代码
    运行后可以发送多条,ctrl+c退出

    1.4 消费之前的消息
    1. bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --from-beginning --topic hello
    复制代码
    1.5 指定偏移量消费
    1. bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --partition 0 --offset 1 --topic hello
    复制代码
    1.6 消费最新的信息
    1. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello
    复制代码
    2 go操作


    2.1 发送消息
    1. // Kafka 配置
    2. const (
    3.         KafkaBroker = "u8sMaster:9092" // 替换为你的 Kafka Broker 地址
    4.         KafkaTopic  = "k8s-version"          // Kafka 主题
    5. )

    6. func main() {
    7.         sendMesgKafka()
    8. }

    9. func sendMesgKafka() {
    10.         w := kafka.NewWriter(kafka.WriterConfig{
    11.                 Brokers:  []string{KafkaBroker},
    12.                 Topic:    KafkaTopic,
    13.                 Balancer: &kafka.LeastBytes{},
    14.         })

    15.         err := w.WriteMessages(context.Background(),
    16.                 kafka.Message{
    17.                         Key:   []byte("Key-A"),
    18.                         Value: []byte("one!"),
    19.                 },
    20.                 kafka.Message{
    21.                         Key:   []byte("Key-B"),
    22.                         Value: []byte("two!"),
    23.                 },
    24.                 kafka.Message{
    25.                         Key:   []byte("Key-C"),
    26.                         Value: []byte("three!"),
    27.                 },
    28.         )

    29.         if err != nil {
    30.                 log.Fatal("failed to write messages:", err)
    31.         }

    32.         if err := w.Close(); err != nil {
    33.                 log.Fatal("failed to close writer:", err)
    34.         }

    35.         fmt.Println("Message sent successfully")

    36. }
    复制代码
    2.2 消费消息
    1. // to consume messages
    2. topic := "test"
    3. partition := 0

    4. conn, err := kafka.DialLeader(context.Background(), "tcp", "u8sMaster:9092", topic, partition)
    5. if err != nil {
    6.     log.Fatal("failed to dial leader:", err)
    7. }

    8. conn.SetReadDeadline(time.Now().Add(10*time.Second))
    9. batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

    10. b := make([]byte, 10e3) // 10KB max per message
    11. for {
    12.     n, err := batch.Read(b)
    13.     if err != nil {
    14.         break
    15.     }
    16.     fmt.Println(string(b[:n]))
    17. }

    18. if err := batch.Close(); err != nil {
    19.     log.Fatal("failed to close batch:", err)
    20. }

    21. if err := conn.Close(); err != nil {
    22.     log.Fatal("failed to close connection:", err)
    23. }
    复制代码
    2.3 列出所有topic
    1. func main() {
    2.     conn, err := kafka.Dial("tcp", "u8sMaster:9092")
    3.     if err != nil {
    4.         panic(err.Error())
    5.     }
    6.     defer conn.Close()
    7.    
    8.     partitions, err := conn.ReadPartitions()
    9.     if err != nil {
    10.         panic(err.Error())
    11.     }
    12.    
    13.     m := map[string]struct{}{}
    14.    
    15.     for _, p := range partitions {
    16.         m[p.Topic] = struct{}{}
    17.     }
    18.     for k := range m {
    19.         fmt.Println(k)
    20.     }
    21. }
    复制代码
    2.4 创建topic
    1. func main() {
    2.         conn, err := kafka.DialLeader(context.Background(), "tcp", "u9sMaster:9092", "topic2", 0)
    3.         if err != nil {
    4.             panic(err.Error())
    5.         }
    6. }
    复制代码
    精准地创建topic
    1. func main() {
    2.     conn, err := kafka.Dial("tcp", "u8sMaster:9092")
    3.     if err != nil {
    4.         panic(err.Error())
    5.     }
    6.     defer conn.Close()
    7.     controller, err := conn.Controller()
    8.     if err != nil {
    9.         panic(err.Error())
    10.     }
    11.     var connLeader *kafka.Conn
    12.     connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
    13.     if err != nil {
    14.         panic(err.Error())
    15.     }
    16.     defer connLeader.Close()
    17. }
    复制代码
    这里省略了kafka集群的配置,未来有机会补充
    以上就是一文详解Golang连接kafka的基本操作的详细内容,更多关于go连接kafka的资料请关注脚本之家其它相关文章!

    来源:互联网
    免责声明:如果侵犯了您的权益,请联系站长(1277306191@qq.com),我们会及时删除侵权内容,谢谢合作!

    最新评论

    QQ Archiver 手机版 小黑屋 福建二哥 ( 闽ICP备2022004717号|闽公网安备35052402000345号 )

    Powered by Discuz! X3.5 © 2001-2023

    快速回复 返回顶部 返回列表