顺藤摸瓜RocketMQ之刷盘机制debug解析_jianjun_fei的博客-程序员秘密

技术标签: java  rocketmq  分布式  RocketMQ  

Rocketmq 刷盘机制

笔者这里分析的是4.8版本,这里发现以前的handleflushdisk 以及handleha这两个方法在4.8中已经不会使用到了,所以这里特地对源码进行深入剖析了,读者可以直接跳到异步刷盘流程阅读。

三个文件

在rocketmq里面存在这样三个文件

  • indexfile
  • consumequeue
  • commitlog

其中indexfile和consumequeue可以理解为索引文件,
commitlog才是真正的存放消息的文件

因为rocketmq要保证性能,发送消息落盘的速度,所以,在落盘上选择了顺序写,这样消息在文件上的顺序就是乱序的,所以就需要维护一个indexfile索引文件,以及消费队列的一个索引

这里先介绍三个文件的api,然后接着分析刷盘流程:所以读者可以先看下面的刷盘流程,然后在回头看三个文件的实现原理,文章篇幅较大

indexFile

indexfile文件存储在store目录下的index文件里面,里面存放的是消息的hashcode和index内容,
在这里插入图片描述

我们来看看indexfile里面都存储的什么东西:

文件由一个文件头组成:长40字节
500w个hashslot,每个4字节
2000w个index条目,每个20字节

所以这里我们可以估算每个indexfile的大小为:40+500w4+2000w20个字节,大约400M左右
在这里插入图片描述

indexfileheader由如下组成:

在这里插入图片描述

那消息的索引是怎么put到这个文件里面的呢?以及是如何获取的呢?

  • 这边我们启动一下环境,然后发送一下消息,就可以进入到该方法

来到putkey方法,this.indexHeader.getIndexCount()是indeshead里面最后4个字节(上图),指index的数量,如果数量小于200w说明没有超过最大的。

然后计算出hash,
slotPos是要存放的hash槽的位置,
absSlotPos是要存放hash槽的具体偏移量(绝对位置)
在这里插入图片描述

接着获取到对应absSlotPos绝对位置的int值,
timeDiff是当前时间与BeginTimestamp的时间差,所以这里4个字节肯定够了

在这里插入图片描述
计算出index的绝对位置,也就是获取index最大,然后顺序往后写
在这里插入图片描述

然后就开始往mappedByteBuffer里面写数据了,主要用到了putInt,putLong方法,写完之后,更新indexHead的信息,

可以看到this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());这段是将index的位置存在了hash槽里面,也就是说,查找一个索引的话是先找到hash槽里面的index位置,然后计算出index的具体偏移量来查找的

如果没有hash冲突,将hashslot++
将indexcount++

这边笔者提出一个问题?为什么要存timeDIff时间?存时间为什么不存绝对时间?

在这里插入图片描述
indexfile怎么解决hash冲突?

如下两段代码:

如果拿到的slotvalue不是0的话,说明存在hash冲突,
然后接下来的this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue)会将该值存入到index里面,也就是index的最后四个字节,也就是说,如果我们拿到index的值的话,发现index的最后四位不是0,那么就继续取出来,也就是说,这里index中相同hash是相连的,所以每个index就有存储hash的必要了,如果hash冲突了,根据最后四位拿出来,还是需要比对一下hash的。
在这里插入图片描述
在这里插入图片描述

接着我们看,是如何从indexfile中查找到消息的offset的?

这边启动执行org.apache.rocketmq.test.client.producer.querymsg.QueryMsgByKeyIT#testQueryMsg这个测试方法,就可以进入debug

来到org.apache.rocketmq.store.index.IndexFile#selectPhyOffset方法:

先将mappedfile锁住,然后根据key计算hash,
拿到slotpos,
计算出slot绝对位置offset
在这里插入图片描述

这边贴出整个方法,以及执行到最后每个变量的值:在右边一目了然

L214:将nextIndexToRead赋值,nextIndexToRead是下一次读取的index的位置
L215:校验
L219:计算出index的绝对位置absIndexPos
L223-L227:拿到index中的值
L239:比较hash值,然后放入phyOffsets中
L242:判断了如果prevIndexRead值合法且不是当前位置,就说明发生了hash冲突,将nextIndexToRead赋值,接着继续循环

所以这个方法执行完成之后phyOffsets会存放所以hash值和预期相同的一些消息地址

所以indexfile是如何解决hash冲突的?这个问题也就迎刃而解

在这里插入图片描述
以上是indexfile的常用连个api

consumeQueue

consumequeue存放在store文件里面,里面的consumequeue文件里面按照topic排放,然后每个topic默认4个队列,里面存放的consumequeue文件

在这里插入图片描述

consumequeue文件存储的单元如下,固定20个字节,单个consumequeue文件默认包含30w个条目,所以单个文件大小大概6M左右,
在这里插入图片描述

这边我们发送消息进入到org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper 的debug:

进行了一些简单的判断之后来到:org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo
在这里插入图片描述

将消息的信息put到byteBufferIndex里面,

先是8个字节的offset,然后4个字节的size,最后8个字节tag,当然这里的最后8个字节也有可能存放延迟消息的执行时间戳这里不展开

在这里插入图片描述

而执行到最后就是将byteBufferIndex追加到该队列里面,也就是追加到consumequeue里面

在这里插入图片描述

appendMessage实现逻辑:

在这里插入图片描述

commitlog

commitlog文件存储再store目录下的commitlog目录下:
每个默认1G大小,
在这里插入图片描述

如下是commitlog的中每个msg的形状,这是计算msg的长度代码里面的一段可以瞥见,可以很明白的看到了一个msg的结构是什么样的想要具体点可以看看丁威老师的博客里面的这里我把图片抄过来:
在这里插入图片描述

对比上面和下面的图片就很清晰的看到每个消息长什么样:
在这里插入图片描述

接下来我们分析一下org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)这个核心方法,

同样我们是发送一个消息来进入到debug:

先是获取到wroteOffset也就是文件初始位置加上bytebuffer写入位置,这里初始位置是第一个文件所以是0,
然后判断系统ip是不是ipv6地址,决定长度
然后调用createMessageId生成msgid,这里不展开,有兴趣的读者可以点进方法看下,

在这里插入图片描述

接着计算一些数据的长度:
将topic转化为byte数组,供存储
计算msg长度
校验msg的长度是否比最大长度长,以及最后commitlog最后需要8个字节的留白

在这里插入图片描述

接着将各个数据都放到bytebuffer里面,可以看到msg的格式和上面图片是一样的,

在这里插入图片描述

然后,就直接new了一个result里面放的是PUT_OK返回出去了,这里可以看到这里把msg放到bytebuffer里面就直接返回了,也就是说,只是放到内存映射里面,然后下面刷写到磁盘上,就又broker自己去操作了,那么同步刷盘是怎么实现呢?

  • 同步刷盘,在调用完这个方法之后会同步调用一下force方法

在这里插入图片描述

异步刷盘

Rocketmq的存储是与读写是机遇NIIO的内存映射机制的,就如上面分析的一样,数据是先都put到一个bytebuffer里面,然后根据配置的刷盘策略在不同时间进行刷盘的,

运行一下namesrv和broker,然后发送消息:进入到debug

消息发送之后在之前分析的一个顺藤摸瓜RocketMQ之消息发送debug解析,里面提到了往remoting模块发送了一个sendmsg的一个code的request,而这个code对应的processer在broker里面是SendMessageProcessor这个类,我们跟进这个类:

因为这边默认的是一步刷盘,所以这里走的是asyncProcessRequest这个方法,
在这里插入图片描述

跟进去判断了一下是否是批量发送消息,然后调用了asyncSendMessage方法:

在这里插入图片描述

继续跟进去发现这边校验了一下code,然后将body序列化了,拿到queueid,设置了一些topic,queue信息到msginner里面,这个msginner对象就是消息对象,最终是要store里面存储的。

在这里插入图片描述

记下来设置了一些时间信息,以及主机信息,判断了是否是事务消息,然后调用了this.brokerController.getMessageStore().asyncPutMessage(msgInner),也就是拿到MessageStore对象然后用该对象里面的commitlog对象来进行存储落盘

在这里插入图片描述

继续跟进,发现msg的一些信息都已经设置好了,接着调用commitlog进行落盘,而

在这里插入图片描述
来到org.apache.rocketmq.store.CommitLog#asyncPutMessage这个方法:

设置了存储时间和crc,对事务进行了判断

在这里插入图片描述

接着拿到锁,有设置了存储时间,最后调用了mappedFile.appendMessage方法,

在这里插入图片描述

一直来到这个方法org.apache.rocketmq.store.MappedFile#appendMessagesInner

在该方法里面校验了写位置是否大于最大长度

然后调用了cb.doAppend,而这个方法我们在上面说过的org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)这个方法,然后进行返回,

到这里我们还没有发现其他两个文件是什么时候更新的呢?comsumequeue和indexfile

在这里插入图片描述

接下来就将两个异步任务给返回出去:

一个刷盘任务,一个ha任务

在这里插入图片描述

因为是异步刷盘,所以这里的submitFlushRequest方法里面走下面这个分支:

直接唤醒刷盘线程,然后返回一个completed的future,并且里面的状态是put_ok,然后就回到我们一开始的发送成功的页面了。

在这里插入图片描述

consumerqueue和indexfile文件是什么时候更新的

在我们的broker启动的时候在这个方法里面org.apache.rocketmq.broker.BrokerController#start
在这里插入图片描述

跟进到方法里面,启动了this.reputMessageService.start();这个线程,

在这里插入图片描述

org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run我们直接来到这个线程的run方法:

一个死循环在执行doreput方法:

在这里插入图片描述

在方法里面:执行了DefaultMessageStore.this.doDispatch(dispatchRequest)这个方法 ,继续更近
在这里插入图片描述

是从一个集合里面拿到了CommitLogDispatcher的实现类,然后挨个执行一下:

在这里插入图片描述

然后我们发现:
他的实现类就包含一个构建consumerqueue的和一个构建index的,那我们继续发送消息进入到debug:

在这里插入图片描述

果然这里面存放的三个dispatcher就是其中两个就是我们需要处理的文件的实现类:

在这里插入图片描述

我们来到consumerqueue实现类,发现这里执行了DefaultMessageStore.this.putMessagePositionInfo(request),而再跟进方法里面发现就是调用了putMessagePositionInfoWrapper来将consumerqueue文件写入到磁盘

在这里插入图片描述

同样我们debug到另一个index的实现类

在这里插入图片描述

跟进方法:

执行了putkey方法

在这里插入图片描述

而这个方法正是调用的是idexfile中的putkey方法:这个我们上面已经分析过:

在这里插入图片描述

所以在broker启动的时候就是会执行一个死循环的线程来将consumerqueue和indexfile进行刷盘。

同步刷盘

同步刷盘本质上是对异步的一种等待:

上面异步刷盘提到了submitFlushRequest这个方法,
可以看到在这个方法里面构建了一个groupcommitrequeest,提交给GroupCommitService执行:

然后在GroupCommitService里面是有个轮训看判断该request中的offset是否比commitlog的最大值小,如果小就将这个任务完成。

这里的思想和consumer的消费逻辑类似,都是发送一个request给service去轮询看是否完成。

那么这里并没有调用get进行阻塞,所以这里也就直接将future返回出去了,会不会有问题呢?

在这里插入图片描述

这里其实还是没有问题的,这里的同步刷盘,确实是将异步任务返回给了sendmessageprocesser,但是我们在回到org.apache.rocketmq.remoting.netty.NettyRemotingAbstract也就是调用的源头,

在第225行里面,这里是调用的sendmessageprocesser中的异步处理,我们明明配置的是同步刷盘,这里还是调用的异步处理,不会出问题吗?

原因是这里将callback这个回调,也加入到了这个异步回调里面,而最终返回给客户端的是在这个callback里面执行的,所以只要保证callback这个逻辑在整个 CompletableFuture之后就行了,

而asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor())这段逻辑正是将callback也加入到整个异步处理的最后:

所以即使这里的调用看上去是异步的,但是只要保证整个顺序就可以了,

而同步异步具体体现在commitlog里面的submitFlushRequest(result, msg)这个方法里面。

在这里插入图片描述

  1. 顺藤摸瓜RocketMQ之整体架构以及执行流程
  2. 顺藤摸瓜RocketMQ之注册中心debug解析
  3. 顺藤摸瓜RocketMQ之消息发送debug解析
  4. 顺藤摸瓜RocketMQ之消息消费debug解析
  5. 顺藤摸瓜RocketMQ之刷盘机制debug解析
  6. 顺藤摸瓜RocketMQ之主从同步(HA)解析
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/jianjun_fei/article/details/117448644

智能推荐

linux传数据到prometheus,Prometheus 进阶_Lekecui的博客-程序员秘密

在「Prometheus入门」一文中我们对Prometheus基本知识点做了讲解,并演示了如何监控一个Linux服务器。这篇文章我们将讲解如何对几个常见的应用进行监控。监控MySQL服务器Prometheus通过安装在远程机器上的exporter来收集监控数据,这里要用到的是mysqld_exporter。部署的架构图安装mysqld_exporter123$ wget https://githu...

CPP-基础:c++读取ini文件_dixiongbi7761的博客-程序员秘密

配置文件格式是[JP]K=2EC156673E 2F4240 5595F6char str[50];GetPrivateProfileString("JP", "K",NULL, str, sizeof(str),".\\keydog.ini");得到str后想将其分成三个字符串str1=2EC156673Estr2=2F4240str3=5595F6第一种方法用MFC 得有这句...

前端自动化构建工具gulp使用_weixin_34255793的博客-程序员秘密

1. 全局安装 gulp:$ npm install --global gulp2. 作为项目的开发依赖(devDependencies)安装:$ npm install --save-dev gulp3. 在项目根目录下创建一个名为 package.json的文件:附上本人项目基本配置{"devDependencies": {"concat": "0.0...

线程Thread基础学习(1)_ddufftimz66781829的博客-程序员秘密

学习过操作系统的人员对于线程一词并不陌生,或多或少或深或浅都有了解,但对于程序员来说,只有了解是不行的,在应聘工作的面试中或多或少总有面试官提到这些问题,此问题涉及领域并不宽,但作用着实不小,特别是在系统性能方面。在多核处理器盛行的今天,多线程成为面试官比较喜欢的话题。方式多为并发的理解,多线程的同步等等。 为了能在工作有能有立足之地,程序员必须每天学习...

原生JavaScript实现贪吃蛇的游戏_风或许自媒体的博客-程序员秘密

贪吃蛇游戏的思考步骤:1.默认初始值(3个)默认向右走2.封装函数move,去移动蛇3.去监听键盘的事件,看用户的怎么移动蛇4.去判断碰到墙就死亡,结束游戏5.去完成投放食物的封装函数6.去实现蛇的吃食物的函数其实我们触发键盘事件,我们是这么处理的,这是一种比较简单的想法。一个div是40px,当横着三个div数组,往右。数组0是蛇头,那么当移动时是最后一个div 的left和top值...

vue开源项目库汇总_ChrisCheng0821的博客-程序员秘密

UI组件 element ★12468 - 饿了么出品的Vue2的web UI工具套件 Vux ★7759 - 基于Vue和WeUI的组件库 iview ★6121 - 基于 Vuejs 的开源 UI 组件库 mint-ui ★5827 - Vue 2的移动UI元素 vue-material ★2997 - 通过Vue Material和Vue 2建立精美的app应用 muse-ui ★

随便推点

开源图算法库简介绍_wsh6759的博客-程序员秘密

开源图算法库简介绍                                         -----by  wangsh              喜欢网络、图算法的童鞋注意了,这里简单分享相关资源:    参考:Cusp   http://code.google.com/r/mjgarland-graphs/python-graph  h

mysql 基本命令(3)-数据类型和运算符_weixin_33883178的博客-程序员秘密

一、数值类型1、数据类型有:数值类型、日i期类型、字符串类型。2、int(20),int 指整数的取值范围,里面的参数20,只是表示数据显示的宽度。显示宽度和数据类型的取值范围是无关的。显示宽度只是指明mysql最大可能显示的宽度,如果插入的数据大于显示宽度的值,只要该值不超过 该数据类型的取值范围,数值依然可以插入,而且还能全部显示出来。例如:规定 id int(4),插入...

51单片机c语言仿真,51单片机C语言实例(350例)Proteus仿真和代码 (51 single chip computer C language example (350 cases) prot..._鋾鎁的博客-程序员秘密

压缩包 : da3fb2c3f918bd8d49e3a513f89.zip 列表51单片机C语言实例(350例)Proteus仿真和代码/51单片机C语言实例(350例)Proteus仿真和代码/400例/51单片机C语言实例(350例)Proteus仿真和代码/400例/单片机C语言实例(400例)/51单片机C语言实例(350例)Proteus仿真和代码/400例/单片机C语言实例(400例)...

oracle 授权 debug,Oracle debug 命令_马斯克·贾的博客-程序员秘密

Oradebug命令SQL>oradebughelpHELP[command]DescribeoneorallcommandsSETMYPIDDebugcurrentprocessSETOSPIDSetOSpidofprocesst...

拜望德古拉伯爵——Whitby游记_jiden的博客-程序员秘密

在Whitby的一整天,天气都极好,从出门到日落,一直不见丝毫阴霾。于是在初抵那个滨海小镇时,只觉得此处平静明媚,街道上总能飘来Fish&Chips的香味,糖果店的灯光温暖且明亮,糖果和巧克力好像都在闪闪发光,临水的房子初看上去都是典型的英国风,但仔细再看,又总能找到几栋明显受过欧陆风格影响的,然后顺着河道往海的方向看去,两座灯塔立在坝上,指引着船只的方向。这似乎是个再典型不过的港口小镇。但是倘若

推荐文章

热门文章

相关标签