技术标签: 分布式事务
上一篇文章我们分析了seata-server端启动流程。本篇文章我们来看seata-server消息处理流程。
seata中有一个全局事务协调器DefaultCoordinator,它主要是处理来自RM和TM的请求来做相应的操作,但是实际的执行者并不是DefaultCoordinator,而是DefaultCore
DefaultCore的继承关系如下图,从继承图中我们可以看到其实Core类的实现类才是一个事务管理器。在seata中有4种事务管理模式,所以每种模式有一个具体的事务管理器。
而DefaultCore则是聚合了4种具体的事务管理器,根据事务的不同类型调用不同的事务管理器。组件的关系如下图
所以事务协调的主要工作就是接受请求然后调用事务管理器进行相应的操作。
之前我们说到,所有的消息都会交给AbstractNettyRemotingServer.ServerHandler来处理,而AbstractNettyRemotingServer.ServerHandler根据消息的不同类型,交给不同的RemotingProcessor来处理
所以我们对那种消息感兴趣只需要看对应的RemotingProcessor实现类即可,我们挑几个常见的消息分析以下,思路都差不多。
tm和rm这部分注册代码看的我有点晕(不重要就没耐心看下去),主要作用就是在tc保存tm和rm的长连接,当tc需要往tm和rm发送消息的时候,就从ChannelManager中找到对应的长连接,然后发送消息
各模式中rm注册的时机如下
xa模式:构建DataSourceProxyXA
at模式:构建DataSourceProxy
tcc模式:GlobalTransactionScanner(Bean初始化后阶段),生成代理对象的时候判定这个方法是tcc的prepare方法
在TC端,全局事务的状态被保存在GlobalSession对象中,分支事务的状态被保存在BranchSession中
ServerOnRequestProcessor处理消息的公共流程为
// DefaultCoordinator
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
}
}
消息的接收是通过DefaultCoordinator,然后交给DefaultCore来执行对应的操作,DefaultCore生成xid并返回
// DefaultCore
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// 创建一个 GlobalSession
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
// 将 ROOT_SESSION_MANAGER 加入到这个 GlobalSession 的监听器列表中
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// 开启 GlobalSession
session.begin();
// 发布事件,如果你对这个事件感兴趣,可以注册这个事件
// transaction start event
eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));
// 返回 xid
return session.getXid();
}
可以只返回一个xid,xid由DefaultCore#begin方法生成,xid的生成策略如下
// seata server ip地址 + seata server 端口号 + 雪花算法生成的唯一id
ipAddress + ":" + port + ":" + tranId;
从GlobalSession#begin方法可以看到GlobalSession用到了观察者模式,当GlobalSession的状态发生变更时,会通过给相应的观察者,观察者都是SessionManager,当接收到相应的事件后,将变更的状态进行持久化存储,当使用db模式存储时,这里会在global_table中插入一条记录。
// GlobalSession
public void begin() throws TransactionException {
this.status = GlobalStatus.Begin;
this.beginTime = System.currentTimeMillis();
this.active = true;
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onBegin(this);
}
}
// AbstractCore
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
// 根据 xid 从 SessionManager 中获取到 GlobalSession
GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
return SessionHolder.lockAndExecute(globalSession, () -> {
globalSessionStatusCheck(globalSession);
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// 创建新的分支事务即 branchSession
BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
applicationData, lockKeys, clientId);
MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
// 对分支事务需要的资源加锁,加锁的逻辑在别的文章详解
branchSessionLock(globalSession, branchSession);
try {
// 将 branchSession 加到 globalSession 的属性中
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
branchSessionUnlock(branchSession);
throw new BranchTransactionException(FailedToAddBranch, String
.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
branchSession.getBranchId()), ex);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
}
return branchSession.getBranchId();
});
}
注意:AT模式下,当分支事务注册的时候,会将修改的数据加锁,如果加锁失败,则抛出异常
// DefaultCore
public GlobalStatus commit(String xid) throws TransactionException {
// 根据xid找到全局事务对象GlobalSession
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
// 已经被commit过了,直接返回成功
return GlobalStatus.Finished;
}
// 添加监听器
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
// Highlight: Firstly, close the session, then no more branch can be registered.
// 关闭 GlobalSession 防止再次有新的 BranchSession 注册进来
globalSession.closeAndClean();
if (globalSession.getStatus() == GlobalStatus.Begin) {
// 判断是否可以异步提交
// 目前只有at模式可以异步提交,因为是通过undolog的方式去做的
if (globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return false;
} else {
globalSession.changeStatus(GlobalStatus.Committing);
return true;
}
}
return false;
});
// 同步提交
// XA/TCC只能同步提交
if (shouldCommit) {
boolean success = doGlobalCommit(globalSession, false);
//If successful and all remaining branches can be committed asynchronously, do async commit.
if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return GlobalStatus.Committed;
} else {
return globalSession.getStatus();
}
} else {
// 异步提交
// 只有AT模式能异步提交
return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
}
}
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start committing event
// 发布事件
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), null, globalSession.getStatus()));
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
} else {
// 取出所有的分支事务,然后提交
Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
// if not retrying, skip the canBeCommittedAsync branches
if (!retrying && branchSession.canBeCommittedAsync()) {
return CONTINUE;
}
BranchStatus currentStatus = branchSession.getStatus();
// 一阶段失败
if (currentStatus == BranchStatus.PhaseOne_Failed) {
globalSession.removeBranch(branchSession);
return CONTINUE;
}
try {
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
switch (branchStatus) {
case PhaseTwo_Committed:
globalSession.removeBranch(branchSession);
return CONTINUE;
case PhaseTwo_CommitFailed_Unretryable:
if (globalSession.canBeCommittedAsync()) {
LOGGER.error(
"Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());
return CONTINUE;
} else {
// 分支事务,不能异步提交,并且还不重试,全局事务执行失败
SessionHelper.endCommitFailed(globalSession);
LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
return false;
}
default:
// 当前是否正在重试
// retrying=true,说明是从重试队列进来的任务,不用再往重试队列放了
if (!retrying) {
globalSession.queueToRetryCommit();
return false;
}
if (globalSession.canBeCommittedAsync()) {
LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
branchSession.getBranchId(), branchStatus);
return CONTINUE;
} else {
LOGGER.error(
"Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
return false;
}
}
} catch (Exception ex) {
StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
new String[] {
branchSession.toString()});
if (!retrying) {
globalSession.queueToRetryCommit();
throw new TransactionException(ex);
}
}
return CONTINUE;
});
// Return if the result is not null
// result 不为null 则为 false
if (result != null) {
return result;
}
//If has branch and not all remaining branches can be committed asynchronously,
//do print log and return false
// 有分支事务,并且不允许异步提交,说明失败了
if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
return false;
}
}
//If success and there is no branch, end the global transaction.
// 分支事务全部提交成功了
if (success && globalSession.getBranchSessions().isEmpty()) {
// 全局事务状态改为已提交
SessionHelper.endCommitted(globalSession);
// committed event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));
LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
}
return success;
}
可以看到AT模式可以异步提交,因为AT模式全局提交只是删除undoLog,异步提交可以提高执行效率。而其他模式得同步提交,依次向RM发送分支事务提交请求,当所有分支事务都执行成功后,全局事务提交成功。否则,将任务交给管理重试的SessionManager进行重试
全局事务的提交和回滚逻辑差不多,回滚逻辑就不分析了
当我们需要进行全局提交时,需要向各个RM发送对应的请求,注意发送的是同步请求,阻塞获取结果。
实现思路主要是如下一个map
// 消息id -> 消息对应的MessageFuture
ConcurrentMap<Integer, MessageFuture> futures
每个消息有一个消息id,当发送的时候给每条消息创建一个MessageFuture,放在futures中,然后这个MessageFuture(底层其实就是CompletableFuture)阻塞获取结果
而ServerOnResponseProcessor则是用来接收分支提交(请求和响应对应的消息id是一样的),当收到结果后,设置消息对应的MessageFuture为完成,此时阻塞的同步请求就能获取到结果了
public class ServerOnResponseProcessor implements RemotingProcessor {
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
// 根据消息id找到对应的MessageFuture
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
messageFuture.setResultMessage(rpcMessage.getBody());
} else {
// 没有找到对应的消息发送记录
// 删除部分代码
}
}
}
文章浏览阅读829次。基础概念本文在进行文本相似度分析过程分为以下几个部分进行,文本分词语料库制作算法训练结果预测分析过程主要用两个包来实现jieba,gensimjieba:主要实现分词过程gensim:进行语料库制作和算法训练结巴(jieba)分词在自然语言处理领域中,分词和提取关键词都是对文本处理时通常要进行的步骤。用Python语言对英文文本进行预处理时可选择NLTK库,中文文本预处..._python gensim模块和jieba模块的区别
文章浏览阅读5.8k次。ScrollPic.js对于一些新手来说是一个很好理解运用的图片左右滚动插件,兼容性较好,可以放心大胆的使用。_scrollpic.js
文章浏览阅读275次。第4篇 操作PDF篇 第13章 操作PDF文档 13.1 文档和文档属性 实例380 创建PDF文档 public static void main(String[] args) { try { Document document = n..._java开发实例大全pdf百度云
文章浏览阅读1.6k次。Author:阿冬哥Created:2013-4-17Blog:http://blog.csdn.net/c359719435/Copyright 2013阿冬哥http://blog.csdn.net/c359719435/使用以及转载请注明出处1 设置socket tcp缓冲区大小的疑惑疑惑1:通过setsockopt设置SO_SNDBUF、SO_RCVBUF这连个默认缓冲区的值,再用ge..._java api 调用setsockopt(2)系统调用so_rcvbuf选项来控制它的大小
文章浏览阅读1.2w次。所谓三才即天才、人才、地才,它们分别是天格、人格、地格数字的个位 数。4U,byhN0五行之间的关系是:木、火、土、金、水相临相生,相隔相克。这样,根据数理与五行之间的内在联系,推算出来的配置关系即为三才配置。从中观察三才配置的凶吉,可以判断把握您的综合运势,预测您的事业成功 率以及身体状况. 111 成功顺利伸展,希望圆满达成,基础安定,_113易经数字代表什么
文章浏览阅读1.2w次,点赞5次,收藏9次。半洗白原理半洗白原理:DSM6 以上的系统。群晖Docker中会增加一个叫做DSM的功能。简单来说,就是利用Docker在你的群晖系统中,虚拟一个群晖系统。然而,令我们庆幸的是。这个虚拟出来的小群晖。在Docker中是自带生成序列号(SN)的。只不过这个序列号我们只能用来半洗白而不能全洗白。所以,我们只需要将这个小群晖的序列号和MAC 提取出来。替换到你物理机安装的群晖引导当中,就可以实现半洗白咯..._2020年黑群晖还能洗白吗
文章浏览阅读1w次,点赞9次,收藏34次。1、–os-shell原理使用udf提权获取webshell,也是通过into outfile向服务器写入两个文件,一个是可以直接执行系统命令,一个是进行上传文件。–os-shell的执行条件:dbms为mysql,网站必须是root权限攻击者需要知道网站的绝对路径magic_quotes_gpc = off,php主动转移功能关闭2、环境介绍phpstudy+sqlmap3、探测网站根目录python3 sqlmap.py -u "127.0.0.1/sqli-labs-master_sqlmap --os-shell
文章浏览阅读3.8w次,点赞19次,收藏32次。转载请标明出处: 本文出自:【Android_Jerry的博客】一、首先是Listview的属性设置设置滑动到顶部和底部的背景或颜色:android:overScrollFooter="@android:color/transparent"android:overScrollHeader="@android:color/transparent"设置滑动到边缘时无效果模式:android:ove_recyclerview去掉滚动
文章浏览阅读1w次。在c++编程过程中遇到printf()函数无法输出的问题,但是代码没有问题,使用puts()函数可以正常输出。原因为系统缓冲区问题。有三个解决办法:1.添加换行符printf("XXXXXXX \n");2.输出后手动刷新系统缓冲区fflush(stdout);3.预先设定无缓冲区setvbuf(stdout, NULL, _IONBF, 0);..._vc++2010printf函数无法输出
文章浏览阅读2.2k次,点赞5次,收藏18次。我很庆幸在七年前选择了计算机专业,虽然选专业完全是听从了命运的安排,直接滑档到了第五个志愿,但是我还是很感谢命运给我这样的安排,遇到了我的本科导师还有几个很好的老师,遇到了几个很好的朋友,回想起来,真好。也正因为本科学校有保研资格,我通过不懈努力来了我的研究生学校,选择了我喜欢的方向,做着学术研究,对我自己的领域说不上如数家珍,也可以算得上有了深入了解。有了一定的研究成果以及研究项目,转眼之间到了毕业的时候,毕业之前经历了漫长的找工作之旅,这趟旅程里充满了焦虑不安以及后悔。找工作的时候很迷茫,不知道选择哪_计算机秋招难吗
文章浏览阅读3.8k次,点赞4次,收藏26次。设计模式(DesignPattern)是前辈们对代码开发经验的总结,是解决特定问题的一系列套路。它不是语法规定,而是一套用来提高代码可复用性、可维护性、可读性、稳健性以及安全性的解决方案。本篇为设计模式面试专题,总共收录了35道常见面试题及答案解析,希望能帮到你~1、什么是设计模式?就是经过实践验证的用来解决特定环境下特定问题的解决方案2、设计模式用来干什么?寻找合适的对象决定对象的粒度指定对象的接口描述对象的实现运用复用机制重复使用经过实践验证的正确的,用来解决某一类问题的解决方案_设计模式面试题
文章浏览阅读442次。{pc:get sql="select * from phpcms_category where parentid =17 order by listorder ASC" return="data"} {loop $data $r} <li><a class="" href="{str_replace('www.shandlawyer.cn','m.sh..._phpcms 调用当前栏目子类