seata源码解析:TM RM 客户端的初始化过程_Java识堂的博客-程序员资料

技术标签: java  后端  分布式事务  开发语言  

请添加图片描述

TM和RM初始化过程

上一篇文章说过,在Spring启动的过程中就会就会初始化TM和RM,建立与TC的长连接。TM,RM,TC都是用netty来处理网络连接的,初始化netty客户端和服务端的过程也非常类似。

本篇文章只分析TM的初始化过程,RM和TM复用了很多方法

// TmNettyRemotingClient
public void init() {
    
    // registry processor
    // 注册消息处理器
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
    
        super.init();
    }
}
// AbstractNettyRemotingClient
public void init() {
    
    // 不断连接seata server
    timerExecutor.scheduleAtFixedRate(new Runnable() {
    
        @Override
        public void run() {
    
            clientChannelManager.reconnect(getTransactionServiceGroup());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    // 是否允许批量发送请求
    if (NettyClientConfig.isEnableClientBatchSendRequest()) {
    
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
            MAX_MERGE_SEND_THREAD,
            KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(),
            new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    // 移除发送超时的消息
    super.init();
    clientBootstrap.start();
}

clientBootstrap#start是netty启动的模版代码,注册消息处理器和处理消息的套路我在seata server启动的文章分析的比较详细,本篇文章就不深入分析了

建立和TC的连接

TM和RM每隔10s都要TC集群的每个地址建立长连接

// NettyClientChannelManager#reconnect
void reconnect(String transactionServiceGroup) {
    
    List<String> availList = null;
    try {
    
        // 获得事务分组对应的集群中每台机器地址
        availList = getAvailServerList(transactionServiceGroup);
    } catch (Exception e) {
    
        LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
        return;
    }
    if (CollectionUtils.isEmpty(availList)) {
    
        RegistryService registryService = RegistryFactory.getInstance();
        String clusterName = registryService.getServiceGroup(transactionServiceGroup);

        if (StringUtils.isBlank(clusterName)) {
    
            LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
                    ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
                    transactionServiceGroup);
            return;
        }

        if (!(registryService instanceof FileRegistryServiceImpl)) {
    
            LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
        }
        return;
    }
    // 遍历tc服务器地址
    for (String serverAddress : availList) {
    
        try {
    
            // 建立与tc的连接
            acquireChannel(serverAddress);
        } catch (Exception e) {
    
            LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);
        }
    }
}
Channel acquireChannel(String serverAddress) {
    
    Channel channelToServer = channels.get(serverAddress);
    // 与当前serverAddress已经存在连接,直接返回
    if (channelToServer != null) {
    
        channelToServer = getExistAliveChannel(channelToServer, serverAddress);
        if (channelToServer != null) {
    
            return channelToServer;
        }
    }
    if (LOGGER.isInfoEnabled()) {
    
        LOGGER.info("will connect to " + serverAddress);
    }
    // 与当前serverAddress不存在连接,新建连接
    Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());
    synchronized (lockObj) {
    
        return doConnect(serverAddress);
    }
}
private Channel doConnect(String serverAddress) {
    
    Channel channelToServer = channels.get(serverAddress);
    // 当前地址已经存在连接
    if (channelToServer != null && channelToServer.isActive()) {
    
        return channelToServer;
    }
    Channel channelFromPool;
    try {
    
        NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
        NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
        if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
    
            RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
            ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
        }
        channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
        channels.put(serverAddress, channelFromPool);
    } catch (Exception exx) {
    
        LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx);
        throw new FrameworkException("can not register RM,err:" + exx.getMessage());
    }
    return channelFromPool;
}

请添加图片描述
TM和RM客户端在启动的时候会和集群中的的每台seata server建立长连接,但是在后续发送请求的时候,比如开启全局事务,注册分支事务只会和其中的一台机器通讯,TM或RM首先根据事务分组找到集群列表,然后根据负载均衡策略从列表中选出一台机器发起请求。具体代码可参见AbstractNettyRemotingClient#sendSyncRequest方法

参考博客

[1]https://blog.csdn.net/zjj2006/category_10310426.html
[2]https://blog.csdn.net/weixin_38308374/article/details/108944877

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zzti_erlie/article/details/121068865

智能推荐

【转】15款Java程序员必备的开发工具_weixin_30587025的博客-程序员资料

如果你是一名Web开发人员,那么用膝盖想也知道你的职业生涯大部分将使用Java而度过。这是一款商业级的编程语言,我们没有办法不接触它。对于Java,有两种截然不同的观点:一种认为Java是最简单功能最强大的编程语言之一,另一种则表示这种编程语言既难用又复杂。下面这些工具或许功能和作用不同,但是有着一个共同的主旨,那就是——它们都是为了给Java编码和开发提供卓越的支持。1. JDK...

手欠升级 Xcode 12 导致 carthage 构建只包含 RxSwift 的 Cartfile 失败_xcode13.4.1安装carthage后导包swifter失败_胡争辉的博客-程序员资料

手欠升级 Xcode 12 导致 carthage 构建只包含 RxSwift 的 Cartfile 失败2020年9月16日 Apple 发布了 Xcode 12,前一个版本是 11.7,秉承了 Apple 一贯的新版本坑人的传统,果然安装后各种崩,删减后最小重现代码如下。https://github.com/huzhenghui/swift-awesome/blob/master/carthage/RxSwift/justfilehttps://github.com/huzhenghui/swif

thttpd_sfrysh的博客-程序员资料

<br />关键字: thttpd <br />1 引言<br />     随着微处理器技术、计算机网络技术的进步,基于嵌入式 WEB的网络数字视频监控系统逐渐得到了人们的广泛关注。把图像采集、视频压缩和WEB功能集中到一个体积很小的设备内,可以直接连入局域网和Internet,达到即插即用,省掉多种复杂的电缆,安装方便,用户也无须安装任何硬件设备即可观看,这使得由嵌入式 网络视频监控服务器 组成的监控网络组网和扩展都极为灵活方便。2  WEB服务器 所在系统工作原理<br />    如图1所示,系统有

vim编辑器常用的命令模式、插入模式、底行模式_在vim的什么模式下,按键yy会复制一行_胤哥的博客的博客-程序员资料

首先下载vim编辑器yum install -y vim进入vim操作命令模式上下左右键:移动光标yy:复制当前所在行n+yy:复制当前所在行以及以下n行的内容 例:3yyp:在光标所在的下一行进行粘贴dd:删除所在行n+dd:删除当前所在行以及以下n行的内容 例:3ddgg:快速跳转到文件的头部所在行的首字符G:快速跳转到文件的尾部所在行的首字符n+G:快速跳转到文件...

unity动画系统(Animator)之动画倒播等待很长时间才开始播放(完美解决)_unity动画过渡时间太长_苦逼的程序员!!!的博客-程序员资料

何为动画倒播? 动画倒播其实就好比视频倒退一样,让播过的内容在回放回来,那么在unity中如何实现动画片段的倒播呐?见下文!unity-Animator实现动画片段倒播1.为播放动画的游戏物体添加Animator组件并创建好对应的动画控制器(AnimatorController)2.双击打开动画控制器,见下图3.需要在动画片段结束的那一帧添加动画事件,关于如何添...

找出/boot/grub/grub.conf文件中1-255之间的数字_grep匹配1-255的数字_倔强的-小乌龟的博客-程序员资料

grep-E ‘&lt;([1-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])&gt;’/boot/grub/grub.conf解析:^匹配字符串开始,$匹配字符串结尾。&lt;或\b:锚定词首,其后面的任意字符必须作为单词首部出现&gt;或\b:锚定词尾,其前面的任意字符必须作为单词尾部出现注意:需要加()给词首词尾,不认锚定有问题,只会锚...

随便推点

Docker实战 | 第三篇:IDEA集成 Docker 插件实现一键自动打包部署微服务项目_enterpc的博客-程序员资料

一. 前言大家在自己玩微服务项目的时候,动辄十几个服务,每次修改逐一部署繁琐不说也会浪费越来越多时间,所以本篇整理通过一次性配置实现一键部署微服务,实现真正所谓的一劳永逸。二. 配置服务器1. Docker安装服务器需要安装Docker,如未安装参考这篇文章安装即可 Docker实战 | 第一篇:Linux 安装 Docker2. Docker开启远程访问复制代码123nginxvim /usr/lib/systemd/system/docker.service# 在ExecStart=/.

Github—failed to push some refs to 'https://github.com/***/git_project.git'解决办法_专业加纳团队的博客-程序员资料

Github—failed to push some refs to 'https://github.com/***/git_project.git'解决办法报错详情报错详情git version 2.26.2.windows.1因为没有提交内容到master,需要先提交先设置User.email、user.name进入git 目录下命令:git config --global us...

链式栈的进栈、出栈算法_.写出链栈的入栈和出栈算法_shaolanqing的博客-程序员资料

#include&lt;stdio.h&gt;#include&lt;malloc.h&gt;#define len sizeof(stnode)typedef struct node //list为头结点指针,link为结点,node为结点{ int data; struct node *link;}stnode,*stlink;void initialslink(stlink &amp;top) //堆.

vue3 + vite 项目搭建 - 配置环境变量env_Terminal丶句点的博客-程序员资料

在项目根目录新建 .env.development、.env.production、.env.test//开发.env.developmentVITE_MODE_NAME=developmentVITE_APP_ID=123456VITE_AGENT_ID=123456VITE_LOGIN_TEST=trueVITE_RES_URL=https://www.baidu.comVITE_APP_TITLE=风控管理平台VITE_EDITOR=webstorm//生产.env.produ.

deepin-wine-for-ubuntu 安装常用软件 ubuntu_zaf赵的博客-程序员资料

Deepin wine for Ubuntu一、项目介绍Deepin-wine 环境的 Ubuntu 移植版使用deepin原版二进制文件,解决依赖问题仅供个人研究学习使用二、软件架构软件架构说明(1)安装教程在线安装直接使用在线安装脚本,安装最新的Release版本:wget -qO- https://raw.githubuserconten...

推荐文章

热门文章

相关标签