seata源码解析:seata server各种消息处理流程_seata关闭channel-程序员宅基地

技术标签: 分布式事务  

请添加图片描述

seata-server消息处理流程

上一篇文章我们分析了seata-server端启动流程。本篇文章我们来看seata-server消息处理流程。
请添加图片描述
seata中有一个全局事务协调器DefaultCoordinator,它主要是处理来自RM和TM的请求来做相应的操作,但是实际的执行者并不是DefaultCoordinator,而是DefaultCore

DefaultCore的继承关系如下图,从继承图中我们可以看到其实Core类的实现类才是一个事务管理器。在seata中有4种事务管理模式,所以每种模式有一个具体的事务管理器。

请添加图片描述
而DefaultCore则是聚合了4种具体的事务管理器,根据事务的不同类型调用不同的事务管理器。组件的关系如下图
请添加图片描述
所以事务协调的主要工作就是接受请求然后调用事务管理器进行相应的操作

事务协调器接收请求

之前我们说到,所有的消息都会交给AbstractNettyRemotingServer.ServerHandler来处理,而AbstractNettyRemotingServer.ServerHandler根据消息的不同类型,交给不同的RemotingProcessor来处理

在这里插入图片描述
所以我们对那种消息感兴趣只需要看对应的RemotingProcessor实现类即可,我们挑几个常见的消息分析以下,思路都差不多。

事务管理器执行操作

RegRmProcessor和RegTmProcessor

tm和rm这部分注册代码看的我有点晕(不重要就没耐心看下去),主要作用就是在tc保存tm和rm的长连接,当tc需要往tm和rm发送消息的时候,就从ChannelManager中找到对应的长连接,然后发送消息

各模式中rm注册的时机如下

xa模式:构建DataSourceProxyXA
at模式:构建DataSourceProxy
tcc模式:GlobalTransactionScanner(Bean初始化后阶段),生成代理对象的时候判定这个方法是tcc的prepare方法

ServerOnRequestProcessor

在TC端,全局事务的状态被保存在GlobalSession对象中,分支事务的状态被保存在BranchSession中

ServerOnRequestProcessor处理消息的公共流程为

  1. 对应的channel是否注册过,没注册过直接关闭连接,否则到第二步
  2. 针对不同的消息交给DefaultCoordinator类的不同方法来处理,并返回结果
开启全局事务
// DefaultCoordinator
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
    throws TransactionException {
    
    response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
        request.getTransactionName(), request.getTimeout()));
    if (LOGGER.isInfoEnabled()) {
    
        LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
            rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
    }
}

消息的接收是通过DefaultCoordinator,然后交给DefaultCore来执行对应的操作,DefaultCore生成xid并返回

// DefaultCore
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    
    // 创建一个 GlobalSession
    GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
        timeout);
    MDC.put(RootContext.MDC_KEY_XID, session.getXid());
    // 将 ROOT_SESSION_MANAGER 加入到这个 GlobalSession 的监听器列表中
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
 
    // 开启 GlobalSession
    session.begin();

    // 发布事件,如果你对这个事件感兴趣,可以注册这个事件
    // transaction start event
    eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
        session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));

    // 返回 xid
    return session.getXid();
}

可以只返回一个xid,xid由DefaultCore#begin方法生成,xid的生成策略如下

// seata server ip地址 + seata server 端口号 + 雪花算法生成的唯一id
ipAddress + ":" + port + ":" + tranId;

从GlobalSession#begin方法可以看到GlobalSession用到了观察者模式,当GlobalSession的状态发生变更时,会通过给相应的观察者,观察者都是SessionManager,当接收到相应的事件后,将变更的状态进行持久化存储,当使用db模式存储时,这里会在global_table中插入一条记录。

// GlobalSession
public void begin() throws TransactionException {
    
    this.status = GlobalStatus.Begin;
    this.beginTime = System.currentTimeMillis();
    this.active = true;
    for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
    
        lifecycleListener.onBegin(this);
    }
}
注册分支事务
// AbstractCore
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                           String applicationData, String lockKeys) throws TransactionException {
    
    // 根据 xid 从 SessionManager 中获取到 GlobalSession
    GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
    return SessionHolder.lockAndExecute(globalSession, () -> {
    
        globalSessionStatusCheck(globalSession);
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // 创建新的分支事务即 branchSession
        BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                applicationData, lockKeys, clientId);
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
        // 对分支事务需要的资源加锁,加锁的逻辑在别的文章详解
        branchSessionLock(globalSession, branchSession);
        try {
    
            // 将 branchSession 加到 globalSession 的属性中
            globalSession.addBranch(branchSession);
        } catch (RuntimeException ex) {
    
            branchSessionUnlock(branchSession);
            throw new BranchTransactionException(FailedToAddBranch, String
                    .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
                            branchSession.getBranchId()), ex);
        }
        if (LOGGER.isInfoEnabled()) {
    
            LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
                globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
        }
        return branchSession.getBranchId();
    });
}
  1. 根据 xid 从 SessionManager 中获取到 GlobalSession
  2. 创建新的分支事务即 BranchSession
  3. 将 branchSession 加到 globalSession 的属性中,此时GlobalSession会发布分支事务注册事件,SessionManager 收到事件后会在 branch_table 中插入一条记录

注意:AT模式下,当分支事务注册的时候,会将修改的数据加锁,如果加锁失败,则抛出异常

提交全局事务
// DefaultCore
public GlobalStatus commit(String xid) throws TransactionException {
    
    // 根据xid找到全局事务对象GlobalSession
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
    
        // 已经被commit过了,直接返回成功
        return GlobalStatus.Finished;
    }
    // 添加监听器
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    // just lock changeStatus

    boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
    
        // Highlight: Firstly, close the session, then no more branch can be registered.
        // 关闭 GlobalSession 防止再次有新的 BranchSession 注册进来
        globalSession.closeAndClean();
        if (globalSession.getStatus() == GlobalStatus.Begin) {
    
            // 判断是否可以异步提交
            // 目前只有at模式可以异步提交,因为是通过undolog的方式去做的
            if (globalSession.canBeCommittedAsync()) {
    
                globalSession.asyncCommit();
                return false;
            } else {
    
                globalSession.changeStatus(GlobalStatus.Committing);
                return true;
            }
        }
        return false;
    });

    // 同步提交
    // XA/TCC只能同步提交
    if (shouldCommit) {
    
        boolean success = doGlobalCommit(globalSession, false);
        //If successful and all remaining branches can be committed asynchronously, do async commit.
        if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
    
            globalSession.asyncCommit();
            return GlobalStatus.Committed;
        } else {
    
            return globalSession.getStatus();
        }
    } else {
    
        // 异步提交
        // 只有AT模式能异步提交
        return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
    }
}
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
    
    boolean success = true;
    // start committing event
    // 发布事件
    eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
        globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
        globalSession.getBeginTime(), null, globalSession.getStatus()));

    if (globalSession.isSaga()) {
    
        success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
    } else {
    
        // 取出所有的分支事务,然后提交
        Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
    
            // if not retrying, skip the canBeCommittedAsync branches
            if (!retrying && branchSession.canBeCommittedAsync()) {
    
                return CONTINUE;
            }

            BranchStatus currentStatus = branchSession.getStatus();
            // 一阶段失败
            if (currentStatus == BranchStatus.PhaseOne_Failed) {
    
                globalSession.removeBranch(branchSession);
                return CONTINUE;
            }
            try {
    
                BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

                switch (branchStatus) {
    
                    case PhaseTwo_Committed:
                        globalSession.removeBranch(branchSession);
                        return CONTINUE;
                    case PhaseTwo_CommitFailed_Unretryable:
                        if (globalSession.canBeCommittedAsync()) {
    
                            LOGGER.error(
                                "Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());
                            return CONTINUE;
                        } else {
    
                            // 分支事务,不能异步提交,并且还不重试,全局事务执行失败
                            SessionHelper.endCommitFailed(globalSession);
                            LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        }
                    default:
                        // 当前是否正在重试
                        // retrying=true,说明是从重试队列进来的任务,不用再往重试队列放了
                        if (!retrying) {
    
                            globalSession.queueToRetryCommit();
                            return false;
                        }
                        if (globalSession.canBeCommittedAsync()) {
    
                            LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
                                branchSession.getBranchId(), branchStatus);
                            return CONTINUE;
                        } else {
    
                            LOGGER.error(
                                "Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        }
                }
            } catch (Exception ex) {
    
                StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
                    new String[] {
    branchSession.toString()});
                if (!retrying) {
    
                    globalSession.queueToRetryCommit();
                    throw new TransactionException(ex);
                }
            }
            return CONTINUE;
        });
        // Return if the result is not null
        // result 不为null 则为 false
        if (result != null) {
    
            return result;
        }
        //If has branch and not all remaining branches can be committed asynchronously,
        //do print log and return false
        // 有分支事务,并且不允许异步提交,说明失败了
        if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
    
            LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
            return false;
        }
    }
    //If success and there is no branch, end the global transaction.
    // 分支事务全部提交成功了
    if (success && globalSession.getBranchSessions().isEmpty()) {
    
        // 全局事务状态改为已提交
        SessionHelper.endCommitted(globalSession);

        // committed event
        eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
            globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));

        LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
    }
    return success;
}

可以看到AT模式可以异步提交,因为AT模式全局提交只是删除undoLog,异步提交可以提高执行效率。而其他模式得同步提交,依次向RM发送分支事务提交请求,当所有分支事务都执行成功后,全局事务提交成功。否则,将任务交给管理重试的SessionManager进行重试

全局事务的提交和回滚逻辑差不多,回滚逻辑就不分析了

ServerOnResponseProcessor

当我们需要进行全局提交时,需要向各个RM发送对应的请求,注意发送的是同步请求,阻塞获取结果。

实现思路主要是如下一个map

// 消息id -> 消息对应的MessageFuture
ConcurrentMap<Integer, MessageFuture> futures

每个消息有一个消息id,当发送的时候给每条消息创建一个MessageFuture,放在futures中,然后这个MessageFuture(底层其实就是CompletableFuture)阻塞获取结果

而ServerOnResponseProcessor则是用来接收分支提交(请求和响应对应的消息id是一样的),当收到结果后,设置消息对应的MessageFuture为完成,此时阻塞的同步请求就能获取到结果了

请添加图片描述

public class ServerOnResponseProcessor implements RemotingProcessor {
    

    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    
        // 根据消息id找到对应的MessageFuture
        MessageFuture messageFuture = futures.remove(rpcMessage.getId());
        if (messageFuture != null) {
    
            messageFuture.setResultMessage(rpcMessage.getBody());
        } else {
    
            // 没有找到对应的消息发送记录
            // 删除部分代码
        }
    }
}

参考博客

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zzti_erlie/article/details/120894915

智能推荐

文本相似度分析(基于jieba和gensim)-程序员宅基地

文章浏览阅读829次。基础概念本文在进行文本相似度分析过程分为以下几个部分进行,文本分词语料库制作算法训练结果预测分析过程主要用两个包来实现jieba,gensimjieba:主要实现分词过程gensim:进行语料库制作和算法训练结巴(jieba)分词在自然语言处理领域中,分词和提取关键词都是对文本处理时通常要进行的步骤。用Python语言对英文文本进行预处理时可选择NLTK库,中文文本预处..._python gensim模块和jieba模块的区别

ScrollPic.js—简单易用的图片左右滚动插件-程序员宅基地

文章浏览阅读5.8k次。ScrollPic.js对于一些新手来说是一个很好理解运用的图片左右滚动插件,兼容性较好,可以放心大胆的使用。_scrollpic.js

Java开发实例大全提高篇——操作PDF篇-程序员宅基地

文章浏览阅读275次。第4篇 操作PDF篇 第13章 操作PDF文档 13.1 文档和文档属性 实例380 创建PDF文档 public static void main(String[] args) { try { Document document = n..._java开发实例大全pdf百度云

java socket缓冲区大小_socket tcp缓冲区大小的默认值、最大值-程序员宅基地

文章浏览阅读1.6k次。Author:阿冬哥Created:2013-4-17Blog:http://blog.csdn.net/c359719435/Copyright 2013阿冬哥http://blog.csdn.net/c359719435/使用以及转载请注明出处1 设置socket tcp缓冲区大小的疑惑疑惑1:通过setsockopt设置SO_SNDBUF、SO_RCVBUF这连个默认缓冲区的值,再用ge..._java api 调用setsockopt(2)系统调用so_rcvbuf选项来控制它的大小

迷信一把:三才数理吉凶_113易经数字代表什么-程序员宅基地

文章浏览阅读1.2w次。所谓三才即天才、人才、地才,它们分别是天格、人格、地格数字的个位 数。4U,byhN0五行之间的关系是:木、火、土、金、水相临相生,相隔相克。这样,根据数理与五行之间的内在联系,推算出来的配置关系即为三才配置。从中观察三才配置的凶吉,可以判断把握您的综合运势,预测您的事业成功 率以及身体状况. 111 成功顺利伸展,希望圆满达成,基础安定,_113易经数字代表什么

群晖 半洗白_黑群晖利用DDSM半洗白教程-程序员宅基地

文章浏览阅读1.2w次,点赞5次,收藏9次。半洗白原理半洗白原理:DSM6 以上的系统。群晖Docker中会增加一个叫做DSM的功能。简单来说,就是利用Docker在你的群晖系统中,虚拟一个群晖系统。然而,令我们庆幸的是。这个虚拟出来的小群晖。在Docker中是自带生成序列号(SN)的。只不过这个序列号我们只能用来半洗白而不能全洗白。所以,我们只需要将这个小群晖的序列号和MAC 提取出来。替换到你物理机安装的群晖引导当中,就可以实现半洗白咯..._2020年黑群晖还能洗白吗

随便推点

sqlmap之--os-shell_sqlmap --os-shell-程序员宅基地

文章浏览阅读1w次,点赞9次,收藏34次。1、–os-shell原理使用udf提权获取webshell,也是通过into outfile向服务器写入两个文件,一个是可以直接执行系统命令,一个是进行上传文件。–os-shell的执行条件:dbms为mysql,网站必须是root权限攻击者需要知道网站的绝对路径magic_quotes_gpc = off,php主动转移功能关闭2、环境介绍phpstudy+sqlmap3、探测网站根目录python3 sqlmap.py -u "127.0.0.1/sqli-labs-master_sqlmap --os-shell

完美解决隐藏Listview和RecyclerView去掉滚动条和滑动到边界阴影的方案-程序员宅基地

文章浏览阅读3.8w次,点赞19次,收藏32次。转载请标明出处: 本文出自:【Android_Jerry的博客】一、首先是Listview的属性设置设置滑动到顶部和底部的背景或颜色:android:overScrollFooter="@android:color/transparent"android:overScrollHeader="@android:color/transparent"设置滑动到边缘时无效果模式:android:ove_recyclerview去掉滚动

C 中printf无法输出问题_vc++2010printf函数无法输出-程序员宅基地

文章浏览阅读1w次。在c++编程过程中遇到printf()函数无法输出的问题,但是代码没有问题,使用puts()函数可以正常输出。原因为系统缓冲区问题。有三个解决办法:1.添加换行符printf("XXXXXXX \n");2.输出后手动刷新系统缓冲区fflush(stdout);3.预先设定无缓冲区setvbuf(stdout, NULL, _IONBF, 0);..._vc++2010printf函数无法输出

秋招春招总结,经验分享(计算机专业)_计算机秋招难吗-程序员宅基地

文章浏览阅读2.2k次,点赞5次,收藏18次。我很庆幸在七年前选择了计算机专业,虽然选专业完全是听从了命运的安排,直接滑档到了第五个志愿,但是我还是很感谢命运给我这样的安排,遇到了我的本科导师还有几个很好的老师,遇到了几个很好的朋友,回想起来,真好。也正因为本科学校有保研资格,我通过不懈努力来了我的研究生学校,选择了我喜欢的方向,做着学术研究,对我自己的领域说不上如数家珍,也可以算得上有了深入了解。有了一定的研究成果以及研究项目,转眼之间到了毕业的时候,毕业之前经历了漫长的找工作之旅,这趟旅程里充满了焦虑不安以及后悔。找工作的时候很迷茫,不知道选择哪_计算机秋招难吗

【Java架构师面试题】设计模式面试专题(共35题含答案)_设计模式面试题-程序员宅基地

文章浏览阅读3.8k次,点赞4次,收藏26次。设计模式(DesignPattern)是前辈们对代码开发经验的总结,是解决特定问题的一系列套路。它不是语法规定,而是一套用来提高代码可复用性、可维护性、可读性、稳健性以及安全性的解决方案。本篇为设计模式面试专题,总共收录了35道常见面试题及答案解析,希望能帮到你~1、什么是设计模式?就是经过实践验证的用来解决特定环境下特定问题的解决方案2、设计模式用来干什么?寻找合适的对象决定对象的粒度指定对象的接口描述对象的实现运用复用机制重复使用经过实践验证的正确的,用来解决某一类问题的解决方案_设计模式面试题

phpcms调用指定父栏目下的子栏目列表,非文章列表_phpcms 调用当前栏目子类-程序员宅基地

文章浏览阅读442次。{pc:get sql="select * from phpcms_category where parentid =17 order by listorder ASC" return="data"} {loop $data $r} <li><a class="" href="{str_replace('www.shandlawyer.cn','m.sh..._phpcms 调用当前栏目子类

推荐文章

热门文章

相关标签