首页>>后端>>Golang->Golang kafka简述和操作(sarama同步异步和消费组)

Golang kafka简述和操作(sarama同步异步和消费组)

时间:2023-11-29 本站 点击:0

一、Kafka简述

1. 为什么需要用到消息队列

异步:对比以前的串行同步方式来说,可以在同一时间做更多的事情,提高效率;

解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂。

缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息。

2.为什么选择kafka呢?

这没有绝对的好坏,看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文

kafka的优点:

1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余,保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群

kafka的缺点:

1.由于是批量发送,所以数据达不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高

3. Golang 操作kafka

3.1. kafka的环境

网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建,有需要的私我,可以发yaml文件

3.2. 第三方库

github.com/Shopify/sarama//kafka主要的库*github.com/bsm/sarama-cluster//kafka消费组

3.3. 消费者

单个消费者
funcconsumer(){varwgsync.WaitGroupconsumer,err:=sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr!=nil{fmt.Println("Failedtostartconsumer:%s",err)return}partitionList,err:=consumer.Partitions("test0")//获得该topic所有的分区iferr!=nil{fmt.Println("Failedtogetthelistofpartition:,",err)return}forpartition:=rangepartitionList{pc,err:=consumer.ConsumePartition("test0",int32(partition),sarama.OffsetNewest)iferr!=nil{fmt.Println("Failedtostartconsumerforpartition%d:%s\n",partition,err)return}wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg:=rangepc.Messages(){//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d,Offset:%d,key:%s,value:%s\n",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))}deferpc.AsyncClose()wg.Done()}(pc)}wg.Wait()}funcmain(){consumer()}
消费组
funcconsumerCluster(){groupID:="group-1"config:=cluster.NewConfig()config.Group.Return.Notifications=trueconfig.Consumer.Offsets.CommitInterval=1*time.Secondconfig.Consumer.Offsets.Initial=sarama.OffsetNewest//初始从最新的offset开始c,err:=cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID,strings.Split("test0",","),config)iferr!=nil{glog.Errorf("Failedopenconsumer:%v",err)return}deferc.Close()gofunc(c*cluster.Consumer){errors:=c.Errors()noti:=c.Notifications()for{select{caseerr:=<-errors:glog.Errorln(err)case<-noti:}}}(c)formsg:=rangec.Messages(){fmt.Printf("Partition:%d,Offset:%d,key:%s,value:%s\n",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))c.MarkOffset(msg,"")//MarkOffset并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}

3.4. 生产者

同步生产者
packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){config:=sarama.NewConfig()config.Producer.RequiredAcks=sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner=sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes=truemsg:=&sarama.ProducerMessage{}msg.Topic=`test0`msg.Value=sarama.StringEncoder("HelloWorld!")client,err:=sarama.NewSyncProducer([]string{"172.20.3.13:30901"},config)iferr!=nil{fmt.Println("producercloseerr,",err)return}deferclient.Close()pid,offset,err:=client.SendMessage(msg)iferr!=nil{fmt.Println("sendmessagefailed,",err)return}fmt.Printf("分区ID:%v,offset:%v\n",pid,offset)}
异步生产者
funcasyncProducer(){config:=sarama.NewConfig()config.Producer.Return.Successes=true//必须有这个选项config.Producer.Timeout=5*time.Secondp,err:=sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","),config)deferp.Close()iferr!=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(psarama.AsyncProducer){errors:=p.Errors()success:=p.Successes()for{select{caseerr:=<-errors:iferr!=nil{glog.Errorln(err)}case<-success:}}}(p)for{v:="async:"+strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))fmt.Fprintln(os.Stdout,v)msg:=&sarama.ProducerMessage{Topic:topics,Value:sarama.ByteEncoder(v),}p.Input()<-msgtime.Sleep(time.Second*1)}}funcmain(){goasyncProducer()select{}}

3.5. 结果展示->

同步生产打印:

分区ID:0,offset:90

消费打印:

Partition:0,Offset:90,key:,value:HelloWorld!

异步生产打印:

async:7272async:7616async:998

消费打印:

1.由于是批量发送,所以数据达不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高0

3.6 同步和异步差别

同步模式producer把消息发给kafka之后会等待结果返回。

异步模式producer把消息发给kafka之后不会等待结果返回。

作者:小小小丶叶子


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/Golang/261.html