RocketMQ本地环境搭建

网友投稿 725 2022-05-29

RocketMQ在mac+docker环境的本地搭建,以及用go语言实现一个简单的生产和消费案例

1.创建NameServer服务

先用命令docker search rocketmq搜索rocketmq相关镜像

这一步先拉取rocketmqinc/rocketmq镜像,docker pull rocketmqinc/rocketmq

然后在本地创建数据存储路径,因为是在本地电脑搭建环境,所以要用绝对路径

mkdir -p /Users/mymac/docker/rocketmq/data/namesrv/logs /Users/mymac/docker/rocketmq/data/namesrv/store

然后构建namesrv容器,-v也是用刚刚创建的绝对路径

docker run -d \ --restart=always \ --name rmqnamesrv \ -p 9876:9876 \ -v /Users/mymac/docker/rocketmq/data/namesrv/logs:/root/logs \ -v /Users/mymac/docker/rocketmq/data/namesrv/store:/root/store \ -e "MAX_POSSIBLE_HEAP=100000000" \ rocketmqinc/rocketmq \ sh mqnamesrv

2.创建broker结点

先在本地创建数据存储路径

mkdir -p /Users/mymac/docker/rocketmq/data/broker/logs /Users/mymac/docker/rocketmq/data/broker/store /Users/mymac/docker/rocketmq/conf

然后在conf文件夹里面创建一个配置文件夹broker.conf,编辑里面的内容如下

#集群名称 brokerClusterName = DefaultCluster #broker名称,master和slave名称相同 brokerName = broker-a #0表示master,大于0表示各个slave brokerId = 0 #默认凌晨4点消息删除 deleteWhen = 04 #消息在磁盘保留时长,单位小时 fileReservedTime = 48 #broker角色复制方式:SYNC_MASTER,ASYNC_MASTER,SLAVE;即 Master同步复制、Master异步Master、Slave之间同步数据 brokerRole = ASYNC_MASTER #刷盘策略:ASYNC_FLUSH,SYNC_FLUSH;表示同步刷盘和异步刷盘 flushDiskType = ASYNC_FLUSH #nameserver地址,其中10.0.54.77是我本机的ip地址(因为我在本机测试),通过ifconfig的en0可以查出 namesrvAddr = 10.0.54.77:9876 #broker结点所在服务器ip地址,因为我在本机测试,所以填写本机ip 10.0.54.77 brokerIP1 = 10.0.54.77 # 监听端口,默认是10911 listenPort = 10911

然后可以创建broker容器了,-v用刚刚创建的绝对路径

docker run -d \ --name rmqbroker \ --link rmqnamesrv:namesrv \ -p 10911:10911 \ -p 10909:10909 \ -v /Users/mymac/docker/rocketmq/data/broker/logs:/root/logs \ -v /Users/mymac/docker/rocketmq/data/broker/store:/root/store \ -v /Users/mymac/docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \ -e "NAMESRV_ADDR=namesrv:9876" \ -e "MAX_POSSIBLE_HEAP=200000000" \ rocketmqinc/rocketmq \ sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

3.创建rockermq-console服务

先拉取styletang/rocketmq-console-ng镜像,docker pull styletang/rocketmq-console-ng

然后创建容器,其中9999是在本地访问的端口

docker run -d \ --name rmqconsole \ -e "JAVA_OPTS=-Drocketmq.namesrv.addr=10.0.54.77:9876 \ -Dcom.rocketmq.sendMessageWithVIPChannel=false" \ -p 9999:8080 \ styletang/rocketmq-console-ng

此时三个容器都创建好了,可以看到如下图

此时在浏览器输入127.0.0.1:9999可以看到如下场景

4.创建生产者

先在rockermq-console浏览器里面创建一个叫kevintest的topic,然后运行如下代码

func main() { topic := "kevintest" p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.0.54.89:9876"})), producer.WithRetry(2), ) err := p.Start() if err != nil { log.Printf("start producer error: %s \n", err.Error()) os.Exit(1) } for i := 0; i < 10; i++ { msg := &primitive.Message{ Topic: topic, Body: []byte("啦啦啦啦啦啦啦啦啦" + strconv.Itoa(i)), } res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }

运行之后可以看到如下信息

INFO[0000] the topic route info changed changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":8,\"writeQueueNums\":8,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"10.0.54.89:10911\"}}]}" changedFrom="" topic=kevintest send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00001, offsetMsgId=0A00365900002A9F0000000000000D48, queueOffset=4, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=1]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00002, offsetMsgId=0A00365900002A9F0000000000000DF2, queueOffset=4, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=2]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00003, offsetMsgId=0A00365900002A9F0000000000000E9C, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=3]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00004, offsetMsgId=0A00365900002A9F0000000000000F46, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=4]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00005, offsetMsgId=0A00365900002A9F0000000000000FF0, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=5]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00006, offsetMsgId=0A00365900002A9F000000000000109A, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=6]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00007, offsetMsgId=0A00365900002A9F0000000000001144, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=7]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00008, offsetMsgId=0A00365900002A9F00000000000011EE, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=0]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00009, offsetMsgId=0A00365900002A9F0000000000001298, queueOffset=5, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=1]] send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c0000a, offsetMsgId=0A00365900002A9F0000000000001342, queueOffset=5, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=2]] INFO[0000] will remove client from clientMap clientID=10.0.54.89@63825 进程 已完成,退出代码为 0

在rockermq-console浏览器的Message也可以看到消息发送成功

5.创建消费者

现在来编写消费者的代码

RocketMQ本地环境搭建

func main() { c, _ := rocketmq.NewPushConsumer( consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.0.54.89:9876"})), ) err := c.Subscribe("kevintest", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for i := range msgs { fmt.Printf("subscribe callback: %v \n", msgs[i]) } return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } //不能马上退出,要等到收到消息 time.Sleep(time.Millisecond*30000) err = c.Shutdown() if err != nil { fmt.Printf("shutdown Consumer error: %s", err.Error()) } }

运行上面代码之后会到的如下结果

WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=%RETRY%DEFAULT_CONSUMER, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=%RETRY%DEFAULT_CONSUMER, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER offset=0 INFO[0000] the MessageQueue changed, version also updated changeTo=1643080672878951000 changedFrom=0 INFO[0000] The PullThresholdForTopic is changed changeTo=102400 changedFrom=102400 INFO[0000] The PullThresholdSizeForTopic is changed changeTo=51200 changedFrom=51200 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=4]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=4]" consumerGroup=DEFAULT_CONSUMER offset=1 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=5]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=5]" consumerGroup=DEFAULT_CONSUMER offset=1 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=6]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=6]" consumerGroup=DEFAULT_CONSUMER offset=1 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=7]" consumerGroup=DEFAULT_CONSUMER subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦3, Flag=0, properties=map[CONSUME_START_TIME:1643080672908 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880004], TransactionId=], MsgId=0A00364DF14300000000799d60880004, Offse00364D00002A9F00000000000008A2,QueueId=4, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757033, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757044, StoreHost=10.0.54.77:10911, CommitLogOffset=2210, BodyCRC=2146591400, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦3, Flag=0, properties=map[CONSUME_START_TIME:1643080672908 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00004], TransactionId=], MsgId=0A003659F951000000007e00b6c00004, Offse00365900002A9F0000000000000F46,QueueId=4, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376546, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376569, StoreHost=10.0.54.89:10911, CommitLogOffset=3910, BodyCRC=2146591400, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦4, Flag=0, properties=map[CONSUME_START_TIME:1643080672910 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880005], TransactionId=], MsgId=0A00364DF14300000000799d60880005, Offse00364D00002A9F000000000000094C,QueueId=5, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757036, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757047, StoreHost=10.0.54.77:10911, CommitLogOffset=2380, BodyCRC=1637283595, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦4, Flag=0, properties=map[CONSUME_START_TIME:1643080672910 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00005], TransactionId=], MsgId=0A003659F951000000007e00b6c00005, Offse00365900002A9F0000000000000FF0,QueueId=5, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376550, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376573, StoreHost=10.0.54.89:10911, CommitLogOffset=4080, BodyCRC=1637283595, ReconsumeTimes=0, PreparedTransactionOffset=0] WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=7]" consumerGroup=DEFAULT_CONSUMER offset=1 subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦5, Flag=0, properties=map[CONSUME_START_TIME:1643080672914 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880006], TransactionId=], MsgId=0A00364DF14300000000799d60880006, Offse00364D00002A9F00000000000009F6,QueueId=6, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757039, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757051, StoreHost=10.0.54.77:10911, CommitLogOffset=2550, BodyCRC=378652573, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦5, Flag=0, properties=map[CONSUME_START_TIME:1643080672914 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00006], TransactionId=], MsgId=0A003659F951000000007e00b6c00006, Offse00365900002A9F000000000000109A,QueueId=6, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376554, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376577, StoreHost=10.0.54.89:10911, CommitLogOffset=4250, BodyCRC=378652573, ReconsumeTimes=0, PreparedTransactionOffset=0] WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER offset=1 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=1]" consumerGroup=DEFAULT_CONSUMER WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=1]" consumerGroup=DEFAULT_CONSUMER offset=2 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=2]" consumerGroup=DEFAULT_CONSUMER subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦6, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880007], TransactionId=], MsgId=0A00364DF14300000000799d60880007, Offse00364D00002A9F0000000000000AA0,QueueId=7, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757043, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757054, StoreHost=10.0.54.77:10911, CommitLogOffset=2720, BodyCRC=261658151, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦6, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00007], TransactionId=], MsgId=0A003659F951000000007e00b6c00007, Offse00365900002A9F0000000000001144,QueueId=7, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376558, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376581, StoreHost=10.0.54.89:10911, CommitLogOffset=4420, BodyCRC=261658151, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦7, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880008], TransactionId=], MsgId=0A00364DF14300000000799d60880008, Offse00364D00002A9F0000000000000B4A,QueueId=0, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757046, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757057, StoreHost=10.0.54.77:10911, CommitLogOffset=2890, BodyCRC=2023728817, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦7, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00008], TransactionId=], MsgId=0A003659F951000000007e00b6c00008, Offse00365900002A9F00000000000011EE,QueueId=0, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376562, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376585, StoreHost=10.0.54.89:10911, CommitLogOffset=4590, BodyCRC=2023728817, ReconsumeTimes=0, PreparedTransactionOffset=0] WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=2]" consumerGroup=DEFAULT_CONSUMER offset=2 WARN[0000] delete mq from offset table MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=3]" consumerGroup=DEFAULT_CONSUMER subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦0, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880001], TransactionId=], MsgId=0A00364DF14300000000799d60880001, Offse00364D00002A9F00000000000006A4,QueueId=1, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643006757016, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757031, StoreHost=10.0.54.77:10911, CommitLogOffset=1700, BodyCRC=1727738642, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦8, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880009], TransactionId=], MsgId=0A00364DF14300000000799d60880009, Offse00364D00002A9F0000000000000BF4,QueueId=1, StoreSize=170, QueueOffset=3, SysFlag=0, BornTimestamp=1643006757048, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757059, StoreHost=10.0.54.77:10911, CommitLogOffset=3060, BodyCRC=1746975520, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦0, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00001], TransactionId=], MsgId=0A003659F951000000007e00b6c00001, Offse00365900002A9F0000000000000D48,QueueId=1, StoreSize=170, QueueOffset=4, SysFlag=0, BornTimestamp=1643080376514, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376550, StoreHost=10.0.54.89:10911, CommitLogOffset=3400, BodyCRC=1727738642, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦8, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00009], TransactionId=], MsgId=0A003659F951000000007e00b6c00009, Offse00365900002A9F0000000000001298,QueueId=1, StoreSize=170, QueueOffset=5, SysFlag=0, BornTimestamp=1643080376566, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376589, StoreHost=10.0.54.89:10911, CommitLogOffset=4760, BodyCRC=1746975520, ReconsumeTimes=0, PreparedTransactionOffset=0] WARN[0000] fecth offset of mq from broker success MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=3]" consumerGroup=DEFAULT_CONSUMER offset=1 INFO[0000] the MessageQueue changed, version also updated changeTo=1643080672937325000 changedFrom=0 subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦1, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880002], TransactionId=], MsgId=0A00364DF14300000000799d60880002, Offse00364D00002A9F000000000000074E,QueueId=2, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643006757023, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757037, StoreHost=10.0.54.77:10911, CommitLogOffset=1870, BodyCRC=301728644, ReconsumeTimes=0, PreparedTransactionOffset=0] INFO[0000] The PullThresholdForTopic is changed changeTo=11377 changedFrom=102400 subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦9, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d6088000a], TransactionId=], MsgId=0A00364DF14300000000799d6088000a, Offse00364D00002A9F0000000000000C9E,QueueId=2, StoreSize=170, QueueOffset=3, SysFlag=0, BornTimestamp=1643006757051, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757062, StoreHost=10.0.54.77:10911, CommitLogOffset=3230, BodyCRC=522685366, ReconsumeTimes=0, PreparedTransactionOffset=0] INFO[0000] The PullThresholdSizeForTopic is changed changeTo=5688 changedFrom=51200 subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦1, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00002], TransactionId=], MsgId=0A003659F951000000007e00b6c00002, Offse00365900002A9F0000000000000DF2,QueueId=2, StoreSize=170, QueueOffset=4, SysFlag=0, BornTimestamp=1643080376538, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376561, StoreHost=10.0.54.89:10911, CommitLogOffset=3570, BodyCRC=301728644, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦9, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c0000a], TransactionId=], MsgId=0A003659F951000000007e00b6c0000a, Offse00365900002A9F0000000000001342,QueueId=2, StoreSize=170, QueueOffset=5, SysFlag=0, BornTimestamp=1643080376570, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376593, StoreHost=10.0.54.89:10911, CommitLogOffset=4930, BodyCRC=522685366, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦2, Flag=0, properties=map[CONSUME_START_TIME:1643080672941 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880003], TransactionId=], MsgId=0A00364DF14300000000799d60880003, Offse00364D00002A9F00000000000007F8,QueueId=3, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757029, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757040, StoreHost=10.0.54.77:10911, CommitLogOffset=2040, BodyCRC=150295102, ReconsumeTimes=0, PreparedTransactionOffset=0] subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦2, Flag=0, properties=map[CONSUME_START_TIME:1643080672941 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00003], TransactionId=], MsgId=0A003659F951000000007e00b6c00003, Offse00365900002A9F0000000000000E9C,QueueId=3, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376542, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376566, StoreHost=10.0.54.89:10911, CommitLogOffset=3740, BodyCRC=150295102, ReconsumeTimes=0, PreparedTransactionOffset=0] 进程 已完成,退出代码为 0

可见消息消费成功

Docker

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:GaussDB(DWS)数据同步状态查看方法
下一篇:史上最全的Linux常用命令汇总(超全面!超详细!)收藏这一篇就够了!
相关文章