1.kafka的学习
1.1 启动kafka与zookeeper
kafka与zookeeper是相关联的- bin/zookeeper-server-start.sh config/zookeeper.properties
复制代码 与- bin/kafka-server-start.sh config/server.properties
复制代码 1.2 创建topic
- bin/kafka-topics.sh --create --topic hello --bootstrap-server 主机名:9092
复制代码 1.3 生产消息
- bin/kafka-console-producer.sh --broker-list 主机名:9092 --topic hello
复制代码 运行后可以发送多条,ctrl+c退出
1.4 消费之前的消息
- bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --from-beginning --topic hello
复制代码 1.5 指定偏移量消费
- bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --partition 0 --offset 1 --topic hello
复制代码 1.6 消费最新的信息
- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello
复制代码 2 go操作
2.1 发送消息
- // Kafka 配置
- const (
- KafkaBroker = "u8sMaster:9092" // 替换为你的 Kafka Broker 地址
- KafkaTopic = "k8s-version" // Kafka 主题
- )
- func main() {
- sendMesgKafka()
- }
- func sendMesgKafka() {
- w := kafka.NewWriter(kafka.WriterConfig{
- Brokers: []string{KafkaBroker},
- Topic: KafkaTopic,
- Balancer: &kafka.LeastBytes{},
- })
- err := w.WriteMessages(context.Background(),
- kafka.Message{
- Key: []byte("Key-A"),
- Value: []byte("one!"),
- },
- kafka.Message{
- Key: []byte("Key-B"),
- Value: []byte("two!"),
- },
- kafka.Message{
- Key: []byte("Key-C"),
- Value: []byte("three!"),
- },
- )
- if err != nil {
- log.Fatal("failed to write messages:", err)
- }
- if err := w.Close(); err != nil {
- log.Fatal("failed to close writer:", err)
- }
- fmt.Println("Message sent successfully")
- }
复制代码 2.2 消费消息
- // to consume messages
- topic := "test"
- partition := 0
- conn, err := kafka.DialLeader(context.Background(), "tcp", "u8sMaster:9092", topic, partition)
- if err != nil {
- log.Fatal("failed to dial leader:", err)
- }
- conn.SetReadDeadline(time.Now().Add(10*time.Second))
- batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
- b := make([]byte, 10e3) // 10KB max per message
- for {
- n, err := batch.Read(b)
- if err != nil {
- break
- }
- fmt.Println(string(b[:n]))
- }
- if err := batch.Close(); err != nil {
- log.Fatal("failed to close batch:", err)
- }
- if err := conn.Close(); err != nil {
- log.Fatal("failed to close connection:", err)
- }
复制代码 2.3 列出所有topic
- func main() {
- conn, err := kafka.Dial("tcp", "u8sMaster:9092")
- if err != nil {
- panic(err.Error())
- }
- defer conn.Close()
-
- partitions, err := conn.ReadPartitions()
- if err != nil {
- panic(err.Error())
- }
-
- m := map[string]struct{}{}
-
- for _, p := range partitions {
- m[p.Topic] = struct{}{}
- }
- for k := range m {
- fmt.Println(k)
- }
- }
复制代码 2.4 创建topic
- func main() {
- conn, err := kafka.DialLeader(context.Background(), "tcp", "u9sMaster:9092", "topic2", 0)
- if err != nil {
- panic(err.Error())
- }
- }
复制代码 精准地创建topic- func main() {
- conn, err := kafka.Dial("tcp", "u8sMaster:9092")
- if err != nil {
- panic(err.Error())
- }
- defer conn.Close()
- controller, err := conn.Controller()
- if err != nil {
- panic(err.Error())
- }
- var connLeader *kafka.Conn
- connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
- if err != nil {
- panic(err.Error())
- }
- defer connLeader.Close()
- }
复制代码 这里省略了kafka集群的配置,未来有机会补充
以上就是一文详解Golang连接kafka的基本操作的详细内容,更多关于go连接kafka的资料请关注脚本之家其它相关文章!
来源:互联网
免责声明:如果侵犯了您的权益,请联系站长(1277306191@qq.com),我们会及时删除侵权内容,谢谢合作! |
|