上一篇文章说过,在Spring启动的过程中就会就会初始化TM和RM,建立与TC的长连接。TM,RM,TC都是用netty来处理网络连接的,初始化netty客户端和服务端的过程也非常类似。
本篇文章只分析TM的初始化过程,RM和TM复用了很多方法
// TmNettyRemotingClient
public void init() {
// registry processor
// 注册消息处理器
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
}
}
// AbstractNettyRemotingClient
public void init() {
// 不断连接seata server
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
// 是否允许批量发送请求
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
// 移除发送超时的消息
super.init();
clientBootstrap.start();
}
clientBootstrap#start是netty启动的模版代码,注册消息处理器和处理消息的套路我在seata server启动的文章分析的比较详细,本篇文章就不深入分析了
TM和RM每隔10s都要TC集群的每个地址建立长连接
// NettyClientChannelManager#reconnect
void reconnect(String transactionServiceGroup) {
List<String> availList = null;
try {
// 获得事务分组对应的集群中每台机器地址
availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
return;
}
if (CollectionUtils.isEmpty(availList)) {
RegistryService registryService = RegistryFactory.getInstance();
String clusterName = registryService.getServiceGroup(transactionServiceGroup);
if (StringUtils.isBlank(clusterName)) {
LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
transactionServiceGroup);
return;
}
if (!(registryService instanceof FileRegistryServiceImpl)) {
LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
}
return;
}
// 遍历tc服务器地址
for (String serverAddress : availList) {
try {
// 建立与tc的连接
acquireChannel(serverAddress);
} catch (Exception e) {
LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);
}
}
}
Channel acquireChannel(String serverAddress) {
Channel channelToServer = channels.get(serverAddress);
// 与当前serverAddress已经存在连接,直接返回
if (channelToServer != null) {
channelToServer = getExistAliveChannel(channelToServer, serverAddress);
if (channelToServer != null) {
return channelToServer;
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("will connect to " + serverAddress);
}
// 与当前serverAddress不存在连接,新建连接
Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());
synchronized (lockObj) {
return doConnect(serverAddress);
}
}
private Channel doConnect(String serverAddress) {
Channel channelToServer = channels.get(serverAddress);
// 当前地址已经存在连接
if (channelToServer != null && channelToServer.isActive()) {
return channelToServer;
}
Channel channelFromPool;
try {
NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
}
channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
channels.put(serverAddress, channelFromPool);
} catch (Exception exx) {
LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx);
throw new FrameworkException("can not register RM,err:" + exx.getMessage());
}
return channelFromPool;
}
TM和RM客户端在启动的时候会和集群中的的每台seata server建立长连接,但是在后续发送请求的时候,比如开启全局事务,注册分支事务只会和其中的一台机器通讯,TM或RM首先根据事务分组找到集群列表,然后根据负载均衡策略从列表中选出一台机器发起请求。具体代码可参见AbstractNettyRemotingClient#sendSyncRequest方法
[1]https://blog.csdn.net/zjj2006/category_10310426.html
[2]https://blog.csdn.net/weixin_38308374/article/details/108944877
如果你是一名Web开发人员,那么用膝盖想也知道你的职业生涯大部分将使用Java而度过。这是一款商业级的编程语言,我们没有办法不接触它。对于Java,有两种截然不同的观点:一种认为Java是最简单功能最强大的编程语言之一,另一种则表示这种编程语言既难用又复杂。下面这些工具或许功能和作用不同,但是有着一个共同的主旨,那就是——它们都是为了给Java编码和开发提供卓越的支持。1. JDK...
手欠升级 Xcode 12 导致 carthage 构建只包含 RxSwift 的 Cartfile 失败2020年9月16日 Apple 发布了 Xcode 12,前一个版本是 11.7,秉承了 Apple 一贯的新版本坑人的传统,果然安装后各种崩,删减后最小重现代码如下。https://github.com/huzhenghui/swift-awesome/blob/master/carthage/RxSwift/justfilehttps://github.com/huzhenghui/swif
<br />关键字: thttpd <br />1 引言<br /> 随着微处理器技术、计算机网络技术的进步,基于嵌入式 WEB的网络数字视频监控系统逐渐得到了人们的广泛关注。把图像采集、视频压缩和WEB功能集中到一个体积很小的设备内,可以直接连入局域网和Internet,达到即插即用,省掉多种复杂的电缆,安装方便,用户也无须安装任何硬件设备即可观看,这使得由嵌入式 网络视频监控服务器 组成的监控网络组网和扩展都极为灵活方便。2 WEB服务器 所在系统工作原理<br /> 如图1所示,系统有
首先下载vim编辑器yum install -y vim进入vim操作命令模式上下左右键:移动光标yy:复制当前所在行n+yy:复制当前所在行以及以下n行的内容 例:3yyp:在光标所在的下一行进行粘贴dd:删除所在行n+dd:删除当前所在行以及以下n行的内容 例:3ddgg:快速跳转到文件的头部所在行的首字符G:快速跳转到文件的尾部所在行的首字符n+G:快速跳转到文件...
何为动画倒播? 动画倒播其实就好比视频倒退一样,让播过的内容在回放回来,那么在unity中如何实现动画片段的倒播呐?见下文!unity-Animator实现动画片段倒播1.为播放动画的游戏物体添加Animator组件并创建好对应的动画控制器(AnimatorController)2.双击打开动画控制器,见下图3.需要在动画片段结束的那一帧添加动画事件,关于如何添...
grep-E ‘<([1-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])>’/boot/grub/grub.conf解析:^匹配字符串开始,$匹配字符串结尾。<或\b:锚定词首,其后面的任意字符必须作为单词首部出现>或\b:锚定词尾,其前面的任意字符必须作为单词尾部出现注意:需要加()给词首词尾,不认锚定有问题,只会锚...
一. 前言大家在自己玩微服务项目的时候,动辄十几个服务,每次修改逐一部署繁琐不说也会浪费越来越多时间,所以本篇整理通过一次性配置实现一键部署微服务,实现真正所谓的一劳永逸。二. 配置服务器1. Docker安装服务器需要安装Docker,如未安装参考这篇文章安装即可 Docker实战 | 第一篇:Linux 安装 Docker2. Docker开启远程访问复制代码123nginxvim /usr/lib/systemd/system/docker.service# 在ExecStart=/.
Github—failed to push some refs to 'https://github.com/***/git_project.git'解决办法报错详情报错详情git version 2.26.2.windows.1因为没有提交内容到master,需要先提交先设置User.email、user.name进入git 目录下命令:git config --global us...
https://www.jb51.net/article/202488.htm
#include<stdio.h>#include<malloc.h>#define len sizeof(stnode)typedef struct node //list为头结点指针,link为结点,node为结点{ int data; struct node *link;}stnode,*stlink;void initialslink(stlink &top) //堆.
在项目根目录新建 .env.development、.env.production、.env.test//开发.env.developmentVITE_MODE_NAME=developmentVITE_APP_ID=123456VITE_AGENT_ID=123456VITE_LOGIN_TEST=trueVITE_RES_URL=https://www.baidu.comVITE_APP_TITLE=风控管理平台VITE_EDITOR=webstorm//生产.env.produ.
Deepin wine for Ubuntu一、项目介绍Deepin-wine 环境的 Ubuntu 移植版使用deepin原版二进制文件,解决依赖问题仅供个人研究学习使用二、软件架构软件架构说明(1)安装教程在线安装直接使用在线安装脚本,安装最新的Release版本:wget -qO- https://raw.githubuserconten...