seata源码解析:TM RM 客户端的初始化过程_seata timoutr-程序员宅基地

技术标签: java  后端  分布式事务  开发语言  

请添加图片描述

TM和RM初始化过程

上一篇文章说过,在Spring启动的过程中就会就会初始化TM和RM,建立与TC的长连接。TM,RM,TC都是用netty来处理网络连接的,初始化netty客户端和服务端的过程也非常类似。

本篇文章只分析TM的初始化过程,RM和TM复用了很多方法

// TmNettyRemotingClient
public void init() {
    
    // registry processor
    // 注册消息处理器
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
    
        super.init();
    }
}
// AbstractNettyRemotingClient
public void init() {
    
    // 不断连接seata server
    timerExecutor.scheduleAtFixedRate(new Runnable() {
    
        @Override
        public void run() {
    
            clientChannelManager.reconnect(getTransactionServiceGroup());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    // 是否允许批量发送请求
    if (NettyClientConfig.isEnableClientBatchSendRequest()) {
    
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
            MAX_MERGE_SEND_THREAD,
            KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(),
            new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    // 移除发送超时的消息
    super.init();
    clientBootstrap.start();
}

clientBootstrap#start是netty启动的模版代码,注册消息处理器和处理消息的套路我在seata server启动的文章分析的比较详细,本篇文章就不深入分析了

建立和TC的连接

TM和RM每隔10s都要TC集群的每个地址建立长连接

// NettyClientChannelManager#reconnect
void reconnect(String transactionServiceGroup) {
    
    List<String> availList = null;
    try {
    
        // 获得事务分组对应的集群中每台机器地址
        availList = getAvailServerList(transactionServiceGroup);
    } catch (Exception e) {
    
        LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
        return;
    }
    if (CollectionUtils.isEmpty(availList)) {
    
        RegistryService registryService = RegistryFactory.getInstance();
        String clusterName = registryService.getServiceGroup(transactionServiceGroup);

        if (StringUtils.isBlank(clusterName)) {
    
            LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
                    ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
                    transactionServiceGroup);
            return;
        }

        if (!(registryService instanceof FileRegistryServiceImpl)) {
    
            LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
        }
        return;
    }
    // 遍历tc服务器地址
    for (String serverAddress : availList) {
    
        try {
    
            // 建立与tc的连接
            acquireChannel(serverAddress);
        } catch (Exception e) {
    
            LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);
        }
    }
}
Channel acquireChannel(String serverAddress) {
    
    Channel channelToServer = channels.get(serverAddress);
    // 与当前serverAddress已经存在连接,直接返回
    if (channelToServer != null) {
    
        channelToServer = getExistAliveChannel(channelToServer, serverAddress);
        if (channelToServer != null) {
    
            return channelToServer;
        }
    }
    if (LOGGER.isInfoEnabled()) {
    
        LOGGER.info("will connect to " + serverAddress);
    }
    // 与当前serverAddress不存在连接,新建连接
    Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());
    synchronized (lockObj) {
    
        return doConnect(serverAddress);
    }
}
private Channel doConnect(String serverAddress) {
    
    Channel channelToServer = channels.get(serverAddress);
    // 当前地址已经存在连接
    if (channelToServer != null && channelToServer.isActive()) {
    
        return channelToServer;
    }
    Channel channelFromPool;
    try {
    
        NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
        NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
        if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
    
            RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
            ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
        }
        channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
        channels.put(serverAddress, channelFromPool);
    } catch (Exception exx) {
    
        LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx);
        throw new FrameworkException("can not register RM,err:" + exx.getMessage());
    }
    return channelFromPool;
}

请添加图片描述
TM和RM客户端在启动的时候会和集群中的的每台seata server建立长连接,但是在后续发送请求的时候,比如开启全局事务,注册分支事务只会和其中的一台机器通讯,TM或RM首先根据事务分组找到集群列表,然后根据负载均衡策略从列表中选出一台机器发起请求。具体代码可参见AbstractNettyRemotingClient#sendSyncRequest方法

参考博客

[1]https://blog.csdn.net/zjj2006/category_10310426.html
[2]https://blog.csdn.net/weixin_38308374/article/details/108944877

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zzti_erlie/article/details/121068865

智能推荐

要读的书单-程序员宅基地

文章浏览阅读40次。原文:https://segmentfault.com/a/1190000004317649飞龙的程序员书单 – 思想、工程、架构、职业发展思想编程珠玑编程珠玑,字字珠玑。这本书并不单纯讲解算法,而是由一系列实际问题切入,引导读者理解这些问题并学会解决方法,使用现有的算法,或者程序设计技巧解决问题。本书的特色是通过一些精心设计的有趣而又颇具指导意义的程序,对实用程序设计技巧及基本设...

linux监听端口和抓包_linux监听端口数据包-程序员宅基地

文章浏览阅读1.2k次。最近在工作中进行网络联调,使用到了几个命令,这里记录一下监听某个端口我使用的命令是这样的while truedonetstat -ano|grep 8089sleep 2done抓包某个端口tcpdump tcp -i eth0 port 8089这样就可以清楚的看到某个端口是不是有数据进来..._linux监听端口数据包

com.alibaba.nacos.api.exception.NacosException: Request nacos server failed:-程序员宅基地

文章浏览阅读1.7w次,点赞9次,收藏11次。com.alibaba.nacos.api.exception.NacosException: Request nacos server failed: at com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy.requestToServer(NamingGrpcClientProxy.java:279) at com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcCl_request nacos server failed

Mysql Truncated incorrect time value-程序员宅基地

文章浏览阅读1.5k次。Mysql Truncated incorrect time value发现一个mysql的问题,当我们做数据定时更新操作的时候,比如说定时作废48小时内未付款的订单的时候,如果用到mysql TIMEDIFF(var1,var2)函数例如:UPDATE justice_user_customer SET STATUS=0 WHERE TIMEDIFF(SYSDATE(),update_t..._incorrect time value

成为项目经理需要哪些条件或证书?_项目经理需要什么证书-程序员宅基地

文章浏览阅读2.2k次。1.人员开发能力1.项目经理应创造一种学习环境,使员工能从他们所从事的工作中,从他们所经历或观察的情景中得到知识。如尽可能给成员分配全面的任务,使他们丰富知识。如一个没用过Excel的人去用Excel处理数据,这就能使他学会使用Excel。或是让一个阅历不足的成员能跟经验丰富的成员一起工作,使新的成员从经验丰富的人那里学到更多的东西。2.让他们参加正式的培训课程。2.领导能力1.项目经理需要采取民主式的领导方式对于项目经理而言,采用这种领导方式比主要依靠职权的独裁式或命令式的管理方式更为_项目经理需要什么证书

洛谷-1764 翻转游戏 (加强版)_kkke-程序员宅基地

文章浏览阅读472次。题目描述kkke在一个nn的棋盘上进行一个翻转游戏。棋盘的每个格子上都放有一个棋子,每个棋子有2个面,一面是黑色的,另一面是白色的。初始的时候,棋盘上的棋子有的黑色向上,有的白色向上。现在kkke想通过最少次数的翻转,使得棋盘上所有的棋子都是同一个颜色向上的(即全是黑色向上的,或全是白色向上的)。每次翻转的时候,kkke可以选择任意一个棋子,将它翻转,同时,与它上下左右分别相邻的4个棋子也必须..._kkke

随便推点

NIO+SocketChannel+Buffer+Selector 多路复用_基于selector的多路复用socketchannel传输文件-程序员宅基地

文章浏览阅读382次。原文 点击打开链接现在使用NIO的场景越来越多,很多网上的技术框架或多或少的使用NIO技术,譬如Tomcat,Jetty。学习和掌握NIO技术已经不是一个JAVA攻城狮的加分技能,而是一个必备技能。再者,现在互联网的面试中上点level的都会涉及一下NIO或者AIO的问题(AIO下次再讲述,本篇主要讲述NIO),掌握好NIO也能帮助你获得一份较好的offer。 驱使博主写这篇文章的关键是网上关于N..._基于selector的多路复用socketchannel传输文件

网易游戏-测试开发工程师-18年暑期实习-一面二面_外企德科网易游戏测试工程师面试题-程序员宅基地

文章浏览阅读6.5k次,点赞4次,收藏50次。网易互娱实习 测试开发岗 两面技术(一面压力技术+二面hr技术)一面面了一个多小时,上来就是写代码(算法),完了之后问C++,数据库,之后一些测试问题。问得很深,毫无招架之力。1、在线编程,n个数里找第k大的数(我先写了二分,没写出来,换sort函数),效率比较低还有吗,(快排,堆排序),还有什么方法,(建堆,然后不停的把最大的拿出去),这个跟前面是一样的,(然后提示了二分..._外企德科网易游戏测试工程师面试题

shiro基于表单的拦截器身份验证、基于 Basic 的拦截器身份验证,普通身份验证的区别_shiro表单身份验证-程序员宅基地

文章浏览阅读1.5k次。目录 普通身份验证与基于表单的拦截器、基于basic的拦截器身份验证的区别?基于表单的拦截器身份验证和基于 Basic 的拦截器身份验证的区别普通身份验证与基于表单的拦截器、基于basic的拦截器身份验证的区别? 普通身份验证的一个缺点就是,永远返回到同一个成功页面(比如首页),在实际项目中比如支付时如果没有登录将跳转到登录页面,登录成功后再跳回到支付页面;对于这种功能大..._shiro表单身份验证

360声明 腾讯要挟用户卸载360 360将保证和QQ同时正常使用-程序员宅基地

文章浏览阅读203次。腾讯要挟用户卸载360 360将保证和QQ同时正常使用   11月3日下午6点,腾讯公司悍然发布公告,宣布将在装有360软件的所有电脑上停止运行QQ软件,这是置6亿QQ用户安危和利益于全然不顾的暴行。  就此,360公司严正声明如下:  一、腾讯的意图,是以不能使用QQ为要挟,强迫QQ用户卸载360系列软件。原因是“360隐私保护器”曝光了QQ涉嫌偷窥用户隐私的行为,而360新..._腾讯要挟用户卸载三六零,三六零将保证和qq同时正常使用

Hive并发情况下报DELETEME表不存在的异常_hive分区表删除数据 报错不存在-程序员宅基地

文章浏览阅读1.9k次。线上脚本在并行某些hive任务后,偶然会报出一个DELETEME表不存在的异常,异常信息大致如下:FAILED: Error in semantic analysis: javax.jdo.JDODataStoreException: Exception thrown obtaining schema column information from da_hive分区表删除数据 报错不存在

PHP+Swoole实现简单HTTP服务器_php http登陆并通讯-程序员宅基地

文章浏览阅读904次。Swooleswoole官方文档 https://wiki.swoole.comSwoole 是一个 PHP 的 协程 高性能 网络通信引擎,使用 C/C++ 语言编写,提供了多种通信协议的网络服务器和客户端模块。可以方便快速的实现 TCP/UDP服务、高性能Web、WebSocket服务、物联网、实时通讯、游戏、微服务等,使 PHP 不再局限于传统的 Web 领域。PHP基于Swool..._php http登陆并通讯

推荐文章

热门文章

相关标签