RocketMQ主从同步的实例分析以及HA机制原理


这篇文章主要介绍“RocketMQ主从同步的实例分析以及HA机制原理”,在日常操作中,相信很多人在RocketMQ主从同步的实例分析以及HA机制原理问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketMQ主从同步的实例分析以及HA机制原理”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
HA 的实现逻辑放在了 store 存储模块的ha目录中,其核心实现类如下:HAService:主从同步的核心实现类HAService$AcceptSocketService:主服务器监听从服务器连接实现类HAService$GroupTransferService:主从同步通知类,实现同步复制和异步复制的功能HAService$HAClient:从服务器连接主服务实现类HAConnection:主服务端 HA 连接对象的封装,当主服务器接收到从服务器发过来的消息后,会封装成一个 HAConnection 对象,其中里面又封装了读 Socket 连接实现与 写 Socket 连接实现:HAConnection$ReadSocketService:主服务器读实现类HAConnection$WriteSocketService:主服务器写实现类RocketMQ 主从同步的整体工作机制大致是:从服务器主动建立 TCP 连接主服务器,然后每隔 5s 向主服务器发送 commitLog 文件最大偏移量拉取还未同步的消息;主服务器开启监听端口,监听从服务器发送过来的信息,主服务器收到从服务器发过来的偏移量进行解析,并返回查找出未同步的消息给从服务器;客户端收到主服务器的消息后,将这批消息写入 commitLog 文件中,然后更新 commitLog 拉取偏移量,接着继续向主服务拉取未同步的消息。从 HA 实现逻辑可看出,可大致分为两个过程,分别是从服务器上报偏移量,以及主服务器发送未同步消息到从服务器。从上面的实现类可知,从服务器向主服务器上报偏移量的逻辑在 HAClient 类中,HAClient 类是一个继承了 ServiceThread 类,即它是一个线程服务类,在 Broker 启动后,Broker 启动开一条线程定时执行从服务器上报偏移量到主服务器的任务。org.apache.rocketmq.store.ha.HAService.HAClient#run:以上是 HAClient 线程 run 方法逻辑,主要是做了主动连接主服务器,并上报偏移量到主服务器,以及处理主服务器发送过来的消息,并不断循环执行以上逻辑。org.apache.rocketmq.store.ha.HAService.HAClient#connectMaster:该方法是从服务器连接主服务器的逻辑,拿到主服务器地址并且连接上以后,会获取一个 socketChannel 对象,接着还会记录当前时间戳为上次写入的时间戳,lastWriteTimestamp 的作用时用来计算主从同步时间间隔,这里需要注意一点,如果没有配置主服务器地址,该方法会返回 false,即不会执行主从复制。该方法还会调用 DefaultMessageStore 的 getMaxPhyOffset() 方法获取 commitLog 文件最大偏移量,作为本次上报的偏移量。org.apache.rocketmq.store.ha.HAService.HAClient#reportSlaveMaxOffset:该方法向主服务器上报已拉取偏移量,具体做法是将 ByteBuffer 读取位置 position 值为 0,其实跳用 flip() 方法也可以,然后调用 putLong() 方法将 maxOffset 写入 ByteBuffer,将 limit 设置为 8,跟写入 ByteBuffer 中的 maxOffset(long 型)大小一样,最后采取 for 循环将 maxOffset 写入网络通道中,并调用 hasRemaining() 方法,该方法的逻辑为判断 position 是否小于 limit,即判断 ByteBuffer 中的字节流是否全部写入到通道中。org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run:主服务器收到从服务器的拉取偏移量后,会封装成一个 HAConnection 对象,前面也说过 HAConnection 封装主服务端 HA 连接对象的封装,其中有读实现类和写实现类,start() 方法即开启了读写线程:org.apache.rocketmq.store.ha.HAConnection#start:org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#processReadEvent:从以上源码可看出,主服务器接收到从服务器上报的偏移量后,主要作了两件事:获取从服务器上报的偏移量;唤醒主从同步消费者发送消息同步返回的线程,该方法实现了主从同步-同步复制的功能。org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run:读实现类实现逻辑比较长,但主要做了以下几件事情:计算需要拉取的偏移量,如果从服务器第一次拉取,则从最后一个 commitLog 文件的初始偏移量开始同步;传输消息到从服务器;发送心跳包到从服务器,保持长连接。关于第一步,我还需要详细讲解一下,因为之前有想到一个问题:把 brokerA 的从服务器去掉,再启动一台新的从服务器指向brokerA 主服务器,这时的主服务器的消息是否会全量同步到从服务?org.apache.rocketmq.store.MappedFileQueue#getMaxOffset:org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run:从以上逻辑可找到答案,如果有新的从服务器同步主服务器消息,则从最后一个 commitLog 文件的初始偏移量开始同步。回到最开始开启 HAClient 线程上报偏移量的方法,我们发现里面还做了一件事:org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent:该方法用于处理主服务器发送回来的消息数据,这里用了 while 循环的处理,不断地从 byteBuffer 读取数据到缓冲区中,最后调用 dispatchReadRequest 方法将消息数据写入 commitLog 文件中,完成主从复制最后一个步骤。到此,关于“RocketMQ主从同步的实例分析以及HA机制原理”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注开发云网站,小 香港云主机编会继续努力为大家带来更多实用的文章!

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

Like (0)
Donate 微信扫一扫 微信扫一扫
Previous 08/07 10:56
Next 08/07 11:04

相关推荐