RocketMQ事务消息怎样实现


这篇文章主要介绍RocketMQ事务消息怎样实现,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息 香港云主机的实现原理。首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中。
官方版本未发布之前,从apache
rocketmq第一个版本上线后,代码中存在与事务消息相关的代码,例如COMMIT、ROLLBACK、PREPARED,在事务消息未开源之前网上对于事务消息的“声音”基本上是使用类似二阶段提交,主要是根据消息系统标志MessageSysFlag中定义来推测的:TRANSACTION_PREPARED_TYPETRANSACTION_COMMIT_TYPETRANSACTION_ROLLBACK_TYPE消息发送者首先发送TRANSACTION_PREPARED_TYPE类型的消息,然后根据事务状态来决定是提交或回滚事务发送commit请求或rollback请求,如果commit/rollback请求丢失后,rocketmq会在指定超时时间后回查事务状态来决定提交或回滚事务。让我们各自带着自己的理解和猜测,从阅读RocketMQ官方提供的Demo程序入手,试图窥探一些大体的信息。Demo示例程序位于:/rocketmq-example/src/main/java/org/apache/rocketmq/example/transaction包中。该包中未放置消息消费者,为了验证事务的消息消费情况,我们可以从其他包copy一个消费者,从而先运行生产者,然后运行消费者,判断事务消息的预发放、提交、回滚等效果,二话不说,先运行一下,看下效果再说:
消息发送端运行结果:消息消费端效果:综上所述,服务端发送了10条消息,而消费端只收到3条消息,应该是由于事务回滚,造成只提交了3条消息,为了更加严谨,可以安装一个rocketmq-consonse,更加直观的观察shangshagn’s上述结果:
cdn.com/97db9b9f7bc18478739f472e134cf48eb3af8cad.png”>接下来对示例代码进行解读:1、生产者端代码解读:代码@1:创建TransactionListener 实例,字面理解为事务消息事件监听器,下文详细对其进行展开。
代码@2:ExecutorService

executorService,创建一个线程池,其线程的名称前缀”client-transaction-msg-check-thread“,从字面理解为客户端事务消息状态检测线程,我们可以大胆的猜测一下是不是这个线程池调用TransactionListener方法,完成对事务消息的检测呢?【这里只是作者的猜测,大家不能当真,在作者后续文章发布后,如果该观点错误,会加以修复,这里写出来,主要是想分享一下我读源码的方法】。
代码@3:为事务消息发送者设置线程池。
代码@4:为事务消息发送者设置事务监听器。
代码@5:发送10条消息。2、TransactionListener代码解读executeLocalTransaction方法:记录本地事务的事务状态,这里其实现就是循环设置事务消息的状态为0,1,2,demo中是把消息的状态数据存放在一个Map中。实际应用时通常会持久化消息的事务状态,例如数据库或缓存。checkLocalTransaction方法,事务回查业务实现,查本地事务表,判断事务的状态如为0:UNKNOW,1:COMMIT_MESSAGE;ROLLBACK_MESSAGE。这里就能解释,生产者连续发10条消息,因为只有3条消息的事务状态为COMMIT_MESSAGE,故消息消费者只能消费3条。以上是“RocketMQ事务消息怎样实现”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注开发云行业资讯频道!

相关推荐: 如何理解spring框架

如何理解spring框架,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。spring:分层的JavaSE/EE应用full-stack轻量级开源框架,以Ioc(反转控制)和AOP(面…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

(0)
打赏 微信扫一扫 微信扫一扫
上一篇 10/03 16:44
下一篇 10/03 16:44

相关推荐