seata源码解析:TM RM 客户端的初始化过程_Java识堂的博客-程序员秘密

技术标签: java  后端  分布式事务  开发语言  

请添加图片描述

TM和RM初始化过程

上一篇文章说过,在Spring启动的过程中就会就会初始化TM和RM,建立与TC的长连接。TM,RM,TC都是用netty来处理网络连接的,初始化netty客户端和服务端的过程也非常类似。

本篇文章只分析TM的初始化过程,RM和TM复用了很多方法

// TmNettyRemotingClient
public void init() {
    
    // registry processor
    // 注册消息处理器
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
    
        super.init();
    }
}
// AbstractNettyRemotingClient
public void init() {
    
    // 不断连接seata server
    timerExecutor.scheduleAtFixedRate(new Runnable() {
    
        @Override
        public void run() {
    
            clientChannelManager.reconnect(getTransactionServiceGroup());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    // 是否允许批量发送请求
    if (NettyClientConfig.isEnableClientBatchSendRequest()) {
    
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
            MAX_MERGE_SEND_THREAD,
            KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(),
            new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    // 移除发送超时的消息
    super.init();
    clientBootstrap.start();
}

clientBootstrap#start是netty启动的模版代码,注册消息处理器和处理消息的套路我在seata server启动的文章分析的比较详细,本篇文章就不深入分析了

建立和TC的连接

TM和RM每隔10s都要TC集群的每个地址建立长连接

// NettyClientChannelManager#reconnect
void reconnect(String transactionServiceGroup) {
    
    List<String> availList = null;
    try {
    
        // 获得事务分组对应的集群中每台机器地址
        availList = getAvailServerList(transactionServiceGroup);
    } catch (Exception e) {
    
        LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
        return;
    }
    if (CollectionUtils.isEmpty(availList)) {
    
        RegistryService registryService = RegistryFactory.getInstance();
        String clusterName = registryService.getServiceGroup(transactionServiceGroup);

        if (StringUtils.isBlank(clusterName)) {
    
            LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
                    ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
                    transactionServiceGroup);
            return;
        }

        if (!(registryService instanceof FileRegistryServiceImpl)) {
    
            LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
        }
        return;
    }
    // 遍历tc服务器地址
    for (String serverAddress : availList) {
    
        try {
    
            // 建立与tc的连接
            acquireChannel(serverAddress);
        } catch (Exception e) {
    
            LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);
        }
    }
}
Channel acquireChannel(String serverAddress) {
    
    Channel channelToServer = channels.get(serverAddress);
    // 与当前serverAddress已经存在连接,直接返回
    if (channelToServer != null) {
    
        channelToServer = getExistAliveChannel(channelToServer, serverAddress);
        if (channelToServer != null) {
    
            return channelToServer;
        }
    }
    if (LOGGER.isInfoEnabled()) {
    
        LOGGER.info("will connect to " + serverAddress);
    }
    // 与当前serverAddress不存在连接,新建连接
    Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());
    synchronized (lockObj) {
    
        return doConnect(serverAddress);
    }
}
private Channel doConnect(String serverAddress) {
    
    Channel channelToServer = channels.get(serverAddress);
    // 当前地址已经存在连接
    if (channelToServer != null && channelToServer.isActive()) {
    
        return channelToServer;
    }
    Channel channelFromPool;
    try {
    
        NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
        NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
        if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
    
            RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
            ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
        }
        channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
        channels.put(serverAddress, channelFromPool);
    } catch (Exception exx) {
    
        LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx);
        throw new FrameworkException("can not register RM,err:" + exx.getMessage());
    }
    return channelFromPool;
}

请添加图片描述
TM和RM客户端在启动的时候会和集群中的的每台seata server建立长连接,但是在后续发送请求的时候,比如开启全局事务,注册分支事务只会和其中的一台机器通讯,TM或RM首先根据事务分组找到集群列表,然后根据负载均衡策略从列表中选出一台机器发起请求。具体代码可参见AbstractNettyRemotingClient#sendSyncRequest方法

参考博客

[1]https://blog.csdn.net/zjj2006/category_10310426.html
[2]https://blog.csdn.net/weixin_38308374/article/details/108944877

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zzti_erlie/article/details/121068865

智能推荐

Python项目:外星人入侵(一)_海超人and大洋游侠的博客-程序员秘密_python 外星人入侵

1、安装Pygame使用pip模块下载并安装Python包。要安装Pygame,在终端提示符下执行如下命令。下载完成2、创建Pygame窗口及响应用户输入新建文件夹,将其保存为alien_invasion.py,在其中输入以下代码。import sysimport pygameclass AlienInvasion: """管理游戏资源和行为的类""" def __init__(self): """初始化游戏并创建游戏资源""" pygame.init()

js+express+apidoc书写+Postman测试接口 入门(搭建结构)第一部分_laker Zhang的博客-程序员秘密_apidoc postman

1 准备1 项目结构执行如下命令$ express -e test warning: option `--ejs' has been renamed to `--view=ejs' create : test\ create : test\public\ create : test\public\javascripts\ create : test\public\images\ create : test\public\stylesheets\ create

Linux下使用inotify-tools工具监控文件_程序猿编码的博客-程序员秘密

如果想在Linux上监控文件系统的变化,如访问属性、读写属性、权限属性、删除创建、移动等操作。可以考虑使用inotify-tools 工具,inotify-tools 是一个C库和一组命令行的工作提供Linux下inotify的简单接口。下面对inotify-tools讲解。inotify-toolsinotify是一个API,需要通过开发应用程序进行调用,对于大多数用户来讲这有着许多不便,inotify-tools的出现弥补了这一不足。inotify-tools是一套组件,它包括一个C库和几个命令行工

【小概念】格拉姆矩阵(gram matrix)_Sun7_She的博客-程序员秘密

gram矩阵是计算每个通道I的feature map与每个通道j的feature map的内积。gram matrix的每个值可以说是代表i通道的feature map与j通道的feature map的互相关程度。

开发资源收集:128个ajax/javascript框架javascript_chaoxidawang的博客-程序员秘密

ajax框架能够帮助我们快速开发能够通过javascript调用webservice(server page)的网页,而不必要提交整个页面。近来的web应用都使用ajax来提高网页的可交互性和实现更好的功能。目前互联网上有上百个ajax/javascript框架可供我们使用,这里所整理的是比较常用的128个,因为太多了,实在不好分类,请大家将就将就:)1.dojo"dojo"由一个

Maven依赖详细理解_思影影思的博客-程序员秘密_maven 依赖

Maven依赖理解1 简介2 依赖的配置3 依赖的范围4 传递性依赖4.1 传递性依赖和依赖调解5 依赖调解6 可选依赖7 总结1 简介 在Maven项目中,有一个核心文件pom.xml。POM项目对象模型定义了项目的基本信息,用于描述心目如何构建,声明项目依赖。 没有任何的实际Java代码,我们就能顶一个Maven项目的POM,这体现了Maven的一大优点,它能让项目对象模型最大程度地与实...

随便推点

打印机:操作无法完成(错误0x00000709)。再次检查打印机名称,并确保打印机已连接到..._海云之家的博客-程序员秘密_操作无法完成错误0x0000709再次检查打印机名称

操作无法完成.键入的打印机名不正确,或者指定的打印机没有连接到服务器上.有关详细信息,请单帮助然后网上查了查资料,说法倒有N多,说什么看看打印机有没有共享,修改共享名,重新安装打印机驱动,开始 Guest帐号,关闭防火墙。。。然而发现,如果不通过IP来访问网络打印机,而是通过机器名来访问,到可以连上,但是通过机器名并不是那么容易访问的,我的机器上可以访问到,但到 同事的机器上又访问

自动化测试 - uiautomator2框架应用 - 自动打卡_leon.liao的博客-程序员秘密_emobile7上怎么模拟定位打卡

说明最近公司更改了打卡方式,改为使用手机APP定位打卡,类似于阿里的钉钉打卡。为了学有所用,特编程实现自动打卡。框架说明UIAutomator是Android提供的自动化测试框架,支持大部分Android系统操作,是用来做UI黑盒测试的,也就是模拟人工的手工测试,例如:点击控件元素看看显示是否符合预期,特点如下:黑盒测试,模拟人的操作,不需要了解实现细节。不依赖于某个APP,能跨App运行。不需要root权限。缺点是只支持SDK 16(Android 4.1)及以上,不支持Hybi

Android开发自定义Listview的Adapter基类以及通用ViewHolder的写法_梦幻雨夜的博客-程序员秘密

简单的写一个Adapter基类,不用每次写adapter都调用一堆方法。import android.widget.BaseAdapter;import java.util.ArrayList;import java.util.List;public abstract class ListBaseAdapter extends BaseAdapter { protected L...

论文《自适应双目条纹测距法》学习————标定_Orange Wu的博客-程序员秘密

《Adaptive Binocular Fringe Dvnamic ProiectionMethod for High Dvnamic Range Measurement》 (SCI 2019 二区)摘要: 利用条纹投影传感器进行三维测量是目前研究的热点。然而,大多数边缘投影传感器的测量精度和效率仍然受到图像饱和和投影仪非线性效应的严重影响。为了解决这一难题,结合立体视觉技术和边缘投影技术的优...

【C#】winform多语言方案_JimCarter的博客-程序员秘密_winform 多语言

1.CultureInfo的获取和设置CultureInfo通常由两位小写的LanguageCode+两位大写的Country/RegionCode组成,如:zh-CN,zh-TW,jr-JP,en-US,zh-HK。部分地区由languageCode+sripttag+country/regioncode,如zh-Hans-HK(香港简体中文)。 几个有用的属性:CultureInf...

cocos2d 查看pvr图片的详细格式_iteye_8877的博客-程序员秘密

发现一个问题,伙计交过来的pvr图片,我没法确切的知道他到底使用了哪种像素格式。想到一个办法,可以在程序加载的时候打印出一些信息来得到这些信息。主要修改 CCTexturePVR.m 这个文件,先看看pvr支持那些像素格式:#import &amp;lt;Availability.h&amp;gt;#import &amp;lt;zlib.h&amp;gt;#import &quot;CCTexturePVR.h&quot;#...

推荐文章

热门文章

相关标签