5、分布式事务Seata源码学习

本文结合seata的AT模型来对seata的源码实现进行学习

1、Seata的AT模型回顾

1.1、Seata的AT模型

image-20260107112546305

image-20260107115406677

  • 在服务A中开启全局事务,向TC申请全局事务id
  • 服务A发起RPC远程调用,分布式事务参与方服务B和服务C分别向TC注册分支事务
  • 参与方提交本地事务,并记录undo_log表
  • 参与方上报分支事务的执行状态
  • TC根据各分支事务执行状态,通知全局事务是要提交还是回滚
  • 分布式事务各参与方TC指示,如果是要提交全局事务,就直接删除本地表的undo_log表里边对应的记录即可,如果是需要回滚,则根据undo_log表里边的before_image对对应的数据进行回滚

1.2、Seata的AT模型在实际开发中的应用

要在应用中使用Seata的AT模型也非常简单,只需要简单几步:

  • ① 引入seata依赖
  • ② 创建本地undo_log表
  • ③ 在配置文件中进行Seata相关的配置
  • @GlobalTransactional注解开启分布式事务

具体操作见前面的博客内容

2、核心步骤源码分析(AT)

2.1、RM、TM建立与TC的连接

要想开启分布式事务,前提是RM、TM持有和TC的连接,这样才能完成通信。那么这个连接是在什么时候建立的呢?首先我们看一下@GlobalTransactional注解的包结构:

image-20260107153749594

可以看到这个注解定义在org.apache.seata.spring.annotation这个包下,与此同时还定义了一个以Scanner结尾的扫描器,点开看一下这个扫描器

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
        implements CachedConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {

这个扫描器实现了InitializingBean这个接口,那么就一定实现了它的afterPropertiesSet方法

@Override
public void afterPropertiesSet() {
    if (disableGlobalTransaction) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global transaction is disabled.");
        }
        ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (CachedConfigurationChangeListener) this);
        return;
    }
    if (initialized.compareAndSet(false, true)) {
        initClient();   //初始化客户端,什么客户端呢?
    }

    this.findBusinessBeanNamesNeededEnhancement();
}

这里有一个初始化客户端的方法,那么初始化的是什么客户端呢?继续点进去就可以看到初始化TM和RM的代码

protected void initClient() {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Initializing Global Transaction Clients ... ");
    }
    if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
        LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
                    "please change your default configuration as soon as possible " +
                    "and we don't recommend you to use default tx-service-group's value provided by seata",
                    DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
    }
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
    }
    //init TM
    TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }
    //init RM
    RMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }

    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Global Transaction Clients are initialized. ");
    }
    registerSpringShutdownHook();

}

以初始化RM的代码为例,继续

public class RMClient {

    /**
     * Init.
     *
     * @param applicationId           the application id
     * @param transactionServiceGroup the transaction service group
     */
    public static void init(String applicationId, String transactionServiceGroup) {
        RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
        rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
        rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
        rmNettyRemotingClient.init();
    }
}

继续点进init方法来到org.apache.seata.core.rpc.netty.RmNettyRemotingClient#init

@Override
public void init() {
    // registry processor
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
        super.init();

        // Found one or more resources that were registered before initialization
        if (resourceManager != null
            && !resourceManager.getManagedResources().isEmpty()
            && StringUtils.isNotBlank(transactionServiceGroup)) {
            boolean failFast = ConfigurationFactory.getInstance().getBoolean(
                ConfigurationKeys.ENABLE_RM_CLIENT_CHANNEL_CHECK_FAIL_FAST,
                DefaultValues.DEFAULT_CLIENT_CHANNEL_CHECK_FAIL_FAST);
            getClientChannelManager().initReconnect(transactionServiceGroup, failFast);
        }
    }
}

先看一下super.init(),进入org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#init

@Override
public void init() {
    timerExecutor.scheduleAtFixedRate(() -> {
        try {
            clientChannelManager.reconnect(getTransactionServiceGroup());
        } catch (Exception ex) {
            LOGGER.warn("reconnect server failed. {}", ex.getMessage());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    if (this.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                                                          MAX_MERGE_SEND_THREAD,
                                                          KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                                                          new LinkedBlockingQueue<>(),
                                                          new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    clientBootstrap.start();
}

这里调了一个getTransactionServiceGroup()方法,transactionServiceGroup我们之前就在配置文件中配置过,前面提到会根据这个serviceGroup去找到对应的SeataServer,不妨点clientChannelManager.reconnect() 进去看一下,看是不是根据这个serviceGroup去连接TC,点进去之后经过几个重载的方法后来到了doReconnect方法:

void doReconnect(String transactionServiceGroup, boolean failFast) {
    List<String> availList;
    try {
        availList = getAvailServerList(transactionServiceGroup);
    } catch (Exception e) {
        LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
        throwFailFastException(failFast, "Failed to get available servers");
        return;
    }
    if (CollectionUtils.isEmpty(availList)) {
        RegistryService registryService = RegistryFactory.getInstance();
        String clusterName = registryService.getServiceGroup(transactionServiceGroup);

        if (StringUtils.isBlank(clusterName)) {
            LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
                         ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
                         transactionServiceGroup);
            throwFailFastException(failFast, "can not get cluster name in registry config.");
            return;
        }

        if (!(registryService instanceof FileRegistryServiceImpl)) {
            LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
        }
        throwFailFastException(failFast, "no available service found in cluster.");
        return;
    }
    try {
        doReconnect(availList, transactionServiceGroup);
    } catch (Exception e) {
        if (failFast) {
            throw e;
        }
        LOGGER.error("connect server failed. {}", e.getMessage(), e);
    }
}

确实是拿到了一个可用的地址列表availList,然后根据这个列表继续调了doReconnect方法

void doReconnect(List<String> availList, String transactionServiceGroup) {
    Set<String> channelAddress = new HashSet<>(availList.size());
    Map<String, Exception> failedMap = new HashMap<>();
    try {
        for (String serverAddress : availList) {
            try {
                acquireChannel(serverAddress);
                channelAddress.add(serverAddress);
            } catch (Exception e) {
                failedMap.put(serverAddress, e);
            }
        }
        if (failedMap.size() > 0) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
                             failedMap.keySet(),
                             failedMap.values().stream().map(Throwable::getMessage).collect(Collectors.toSet()));
            } else if (LOGGER.isDebugEnabled()) {
                failedMap.forEach((key, value) -> {
                    LOGGER.error("{} can not connect to {} cause:{} trace information:",
                                 FrameworkErrorCode.NetConnect.getErrCode(), key, value.getMessage(), value);
                });
            }
        }
        if (availList.size() == failedMap.size()) {
            String invalidAddress = StringUtils.join(failedMap.keySet().iterator(), ", ");
            throw new FrameworkException("can not connect to [" + invalidAddress + "]");
        }
    } finally {
        if (CollectionUtils.isNotEmpty(channelAddress)) {
            List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());
            for (String address : channelAddress) {
                String[] array = NetUtil.splitIPPortStr(address);
                aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));
            }
            RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);
        } else {
            RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());
        }
    }
}

可以看到会根据这个列表迭代,然后根据每个serverAddress通过acquireChannel方法获取了一个Channel(如果没有现成的连接,则会通过doConnect(serverAddress)方法创建一个channel)

Channel acquireChannel(String serverAddress) {
    Channel channelToServer = channels.get(serverAddress);
    if (channelToServer != null) {
        channelToServer = getExistAliveChannel(channelToServer, serverAddress);
        if (channelToServer != null) {
            return channelToServer;
        }
    }
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("will connect to {}", serverAddress);
    }
    Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());
    synchronized (lockObj) {
        return doConnect(serverAddress);
    }
}

TM的连接建立流程类似,总的来说大致的流程如下:

image-20260107161331952

2.2、事务处理

前面我们知道,分布式事务的开启是靠@GlobalTransactional这个注解开启的,那么肯定就有一个地方来识别这个注解并做一些事情。如果是我们自己来做的化,最先想到的肯定是基于aop来做,而从最上面的那个包结构截图可以看到,紧挨着GlobalTransactional这个注解还定义了一个AspectTransactionalInterceptor

public class AspectTransactionalInterceptor implements MethodInterceptor {

既然这个类实现了MethodInterceptor接口,那程序在执行的时候肯定会调用它的invoke方法

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
    Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    InvocationWrapper invocationWrapper = new DefaultInvocationWrapper(null, invocation.getThis(), specificMethod, invocation.getArguments());
    return this.globalTransactionalInterceptorHandler.invoke(invocationWrapper);
}

继续点进去最后一行的globalTransactionalInterceptorHandler.invoke方法,就来到了org.apache.seata.integration.tx.api.interceptor.handler.AbstractProxyInvocationHandler#invoke

@Override
public Object invoke(InvocationWrapper invocation) throws Throwable {
    if (CollectionUtils.isNotEmpty(getMethodsToProxy()) && !getMethodsToProxy().contains(invocation.getMethod().getName())) {
        return invocation.proceed();
    }
    if (nextInvocationHandlerChain != null) {
        invocation = new NestInterceptorHandlerWrapper(nextInvocationHandlerChain, ihttp://blog.shengxiao.tech/imagesnvocation);
    }
    return doInvoke(invocation);
}

最终来到org.apache.seata.integration.tx.api.interceptor.handler.GlobalTransactionalInterceptorHandler#doInvoke

image-20260107162047514

@Override
protected Object doInvoke(InvocationWrapper invocation) throws Throwable {
    Class<?> targetClass = invocation.getTarget().getClass();
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
        boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes);
        if (!localDisable) {
            final AspectTransactional globalTransactionalAnnotation = getAspectTransactional(specificMethod, targetClass);
            final GlobalLockConfig globalLockAnnotation = getGlobalLockConfig(specificMethod, targetClass);
            if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                AspectTransactional transactional;
                if (globalTransactionalAnnotation != null) {
                    transactional = globalTransactionalAnnotation;
                } else {
                    transactional = this.aspectTransactional;
                }
                return handleGlobalTransaction(invocation, transactional);
            } else if (globalLockAnnotation != null) {
                return handleGlobalLock(invocation, globalLockAnnotation);
            }
        }
    }
    return invocation.proceed();
}

这里边有两个分支,一个是如果globalTransactionalAnnotation或者aspectTransactional不是空的话,就进入handleGlobalTransaction,否则进入handleGlobalLock

进入org.apache.seata.integration.tx.api.interceptor.handler.GlobalTransactionalInterceptorHandler#handleGlobalTransaction,其实只有一行transactionalTemplate.execute,但是这个方法的调用传了一个匿名内部类进来,看一下这个execute方法

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. Get transactionInfo
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
    GlobalTransaction tx = GlobalTransactionContext.getCurrent();

    // 1.2 Handle the transaction propagation.
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        switch (propagation) {
            case NOT_SUPPORTED:
                // If transaction is existing, suspend it.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend(false);
                }
                // Execute without transaction and return.
                return business.execute();
            case REQUIRES_NEW:
                // If transaction is existing, suspend it, and then begin new transaction.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend(false);
                }
                tx = GlobalTransactionContext.createNew();
                // Continue and execute with new transaction
                break;
            case SUPPORTS:
                // If transaction is not existing, execute without transaction.
                if (notExistingTransaction(tx)) {
                    return business.execute();
                }
                // Continue and execute with new transaction
                break;
            case REQUIRED:
                // If current transaction is existing, execute with current transaction,else create
                tx = GlobalTransactionContext.getCurrentOrCreate();
                break;
            case NEVER:
                // If transaction is existing, throw exception.
                if (existingTransaction(tx)) {
                    throw new TransactionException(
                        String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                      , tx.getXid()));
                } else {
                    // Execute without transaction and return.
                    return business.execute();
                }
            case MANDATORY:
                // If transaction is not existing, throw exception.
                if (notExistingTransaction(tx)) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }

        // set current tx config to holder
        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

        if (tx.getGlobalTransactionRole() == GlobalTransactionRole.Participant) {
            LOGGER.info("join into a existing global transaction,xid={}", tx.getXid());
        }

        try {
            // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
            //    else do nothing. Of course, the hooks will still be triggered.
            beginTransaction(txInfo, tx);

            Object rs;
            try {
                // Do Your Business
                rs = business.execute();
            } catch (Throwable ex) {
                // 3. The needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            commitTransaction(tx, txInfo);

            return rs;
        } finally {
            //5. clear
            resumeGlobalLockConfig(previousConfig);
            triggerAfterCompletion(tx);
            cleanUp(tx);
        }
    } finally {
        // If the transaction is suspended, resume it.
        if (suspendedResourcesHolder != null) {
            tx.resume(suspendedResourcesHolder);
        }
    }
}

这里看到两个重点内容:

  • 事务传播属性

seata自己定义了几种事务传播属性,但基本上保持和spring的事务传播属性定义一致,比如NOT_SUPPORTED的时候,将当前事务挂起,以非事务的方式执行并返回;再比如REQUIRES_NEW的时候,将当前事务挂起,然后创建一个新的事务并继续执行后续的事务和业务逻辑。

这里一共定义了6种,相比较spring少了一种NESTED

  • NOT_SUPPORTED:不支持事务,如果有事务,则挂起,以非事务的方式运行
  • REQUIRES_NEW:如果当前有事务,则将当前事务挂起,然后新起一个事务执行
  • SUPPORTS:如果当前没有事务,则以非事务的方式执行,否则就以当前事务执行
  • REQUIRED:如果当前没有事务,则创建一个事务执行,否则就用当前事务执行
  • NEVER:如果当前存在事务,则报错,如果不存在事务,就正常按照非事务的方式执行
  • MANDATORY:如果当前事务不存在,则报错,如果存在,就以当前事务执行
  • 事务的处理

上面的代码可以明显看到beginTransactionccommitTransaction、以及最里边的catch里边的completeTransactionAfterThrowing

1、beginTransaction方法

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    if (tx.getGlobalTransactionRole() != GlobalTransactionRole.Launcher) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Ignore begin: just involved in global transaction [{}]", tx.getXid());
        }
        return;
    }
    try {
        triggerBeforeBegin();
        tx.begin(txInfo.getTimeOut(), txInfo.getName());
        triggerAfterBegin();
    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe,
                                                           TransactionalExecutor.Code.BeginFailure);

    }
}

这个方法里边就是触发事务开启之前、事务开启、以及触发开启事务之后三个方法,主要看下tx.begin

org.apache.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)

@Override
public void begin(int timeout, String name) throws TransactionException {
    this.createTime = System.currentTimeMillis();
    if (role != GlobalTransactionRole.Launcher) {
        assertXIDNotNull();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
        }
        return;
    }
    assertXIDNull();
    String currentXid = RootContext.getXID();
    if (currentXid != null) {
        throw new IllegalStateException("Global transaction already exists," +
                                        " can't begin a new global transaction, currentXid = " + currentXid);
    }
    xid = transactionManager.begin(null, null, name, timeout);
    status = GlobalStatus.Begin;
    RootContext.bind(xid);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Begin new global transaction [{}]", xid);
    }
}

这个方法开启了一个事务,并且得到一个xid,然后将这个xid绑定到上下文中,具体怎么开启的事务呢?其实就是执行一个远程调用org.apache.seata.tm.DefaultTransactionManager#begin

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
    if (response.getResultCode() == ResultCode.Failed) {
        throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    }
    return response.getXid();
}

也就是这里的syncCall,会得到一个响应

public class GlobalBeginResponse extends AbstractTransactionResponse {
    private String xid;
    private String extraData;
}

2、commitTransaction

这个方法是提交事务的方法,方法里边如果超时,会回滚;如果正常提交,会在提交之前触发提交前的hook,在提交之后触发提交后的hook

private void commitTransaction(GlobalTransaction tx, TransactionInfo txInfo)
    throws TransactionalExecutor.ExecutionException, TransactionException {
    if (tx.getGlobalTransactionRole() != GlobalTransactionRole.Launcher) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Ignore commit: just involved in global transaction [{}]", tx.getXid());
        }
        return;
    }
    if (isTimeout(tx.getCreateTime(), txInfo)) {
        // business execution timeout
        Exception exx = new TmTransactionException(TransactionExceptionCode.TransactionTimeout,
                                                   String.format("client detected transaction timeout before commit, so change to rollback, xid = %s", tx.getXid()));
        rollbackTransaction(tx, exx);
        return;
    }

    try {
        triggerBeforeCommit();
        tx.commit();
        GlobalStatus afterCommitStatus = tx.getLocalStatus();
        TransactionalExecutor.Code code = TransactionalExecutor.Code.Unknown;
        switch (afterCommitStatus) {
            case TimeoutRollbacking:
                code = TransactionalExecutor.Code.Rollbacking;
                break;
            case TimeoutRollbacked:
                code = TransactionalExecutor.Code.RollbackDone;
                break;
            case Finished:
                code = TransactionalExecutor.Code.CommitFailure;
                break;
            default:
        }
        Exception statusException = null;
        if (GlobalStatus.isTwoPhaseHeuristic(afterCommitStatus)) {
            statusException = new TmTransactionException(TransactionExceptionCode.CommitHeuristic,
                                                         String.format("Global transaction[%s] not found, may be rollbacked.", tx.getXid()));
        } else if (GlobalStatus.isOnePhaseTimeout(afterCommitStatus)) {
            statusException = new TmTransactionException(TransactionExceptionCode.TransactionTimeout,
                                                         String.format("Global transaction[%s] is timeout and will be rollback[TC].", tx.getXid()));
        }
        if (null != statusException) {
            throw new TransactionalExecutor.ExecutionException(tx, statusException, code);
        }
        triggerAfterCommit();
    } catch (TransactionException txe) {
        // 4.1 Failed to commit
        throw new TransactionalExecutor.ExecutionException(tx, txe,
                                                           TransactionalExecutor.Code.CommitFailure);
    }
}

3、completeTransactionAfterThrowing

这个方法在catch里边,也就是说当程序执行异常的时候会执行。点进去看到里边会进行事务的回滚(事务信息不为空 且 抛出的异常是指定的事务回滚异常)

private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException)
    throws TransactionalExecutor.ExecutionException, TransactionException {
    //roll back
    if (txInfo != null && txInfo.rollbackOn(originalException)) {
        rollbackTransaction(tx, originalException);
    } else {
        // not roll back on this exception, so commit
        commitTransaction(tx, txInfo);
    }
}

4、business.execute

执行的业务方法

2.3、各事务角色的请求交互

前面看到事务的”拆解“执行过程,那么RM/TM与RC之间是如何交互的呢?总的来说有异步和同步的方式。下面以开启事务方法org.apache.seata.tm.DefaultTransactionManager#begin为例

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
    if (response.getResultCode() == ResultCode.Failed) {
        throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    }
    return response.getXid();
}

继续看syncCall方法

private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
    try {
        return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
    } catch (TimeoutException toe) {
        throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
    }
}

继续org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#sendSyncRequest(java.lang.Object)方法

@Override
public Object sendSyncRequest(Object msg) throws TimeoutException {
    String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
    long timeoutMillis = this.getRpcRequestTimeout();
    RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);

    // send batch message
    // put message into basketMap, @see MergedSendRunnable
    if (this.isEnableClientBatchSendRequest()) {

        // send batch message is sync request, needs to create messageFuture and put it in futures.
        MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeoutMillis);
        futures.put(rpcMessage.getId(), messageFuture);

        // put message into basketMap
        BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
                                                                           key -> new LinkedBlockingQueue<>());
        if (!basket.offer(rpcMessage)) {
            LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
                         serverAddress, rpcMessage);
            return null;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("offer message: {}", rpcMessage.getBody());
        }
        if (!isSending) {
            synchronized (mergeLock) {
                mergeLock.notifyAll();
            }
        }

        try {
            Object response = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            return response;
        } catch (Exception exx) {
            LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), serverAddress, rpcMessage.getBody());
            if (exx instanceof TimeoutException) {
                throw (TimeoutException)exx;
            } else {
                throw new RuntimeException(exx);
            }
        }
    } else {
        Channel channel = clientChannelManager.acquireChannel(serverAddress);
        return super.sendSync(channel, rpcMessage, timeoutMillis);
    }

}

在这个方法里边,外边的if分支逻辑为如果开启了批量提交请求,则走if,否则,就同步调用else里边的逻辑。而在if这个逻辑里边seata的方案是将请求封装成一个org.apache.seata.core.protocol.RpcMessage对象,然后将其丢到一个阻塞队列里边,交由定时异步任务去消费处理。这个阻塞队列又以serverAddress作为key放到了一个map里边。异步任务真正获取数据是从这个map里边获取的。

这个basketMap在哪儿消费处理呢?

前面我们在看RM、TM建立与TC连接的代码的时候就看到了org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#init

@Override
public void init() {
    timerExecutor.scheduleAtFixedRate(() -> {
        try {
            clientChannelManager.reconnect(getTransactionServiceGroup());
        } catch (Exception ex) {
            LOGGER.warn("reconnect server failed. {}", ex.getMessage());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    if (this.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                                                          MAX_MERGE_SEND_THREAD,
                                                          KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                                                          new LinkedBlockingQueue<>(),
                                                          new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    clientBootstrap.start();
}

这里一个将一个MergedSendRunnable的任务丢到了线程池中去交由线程池调度执行。这个类的run方法如下:

@Override
public void run() {
    while (true) {
        synchronized (mergeLock) {
            try {
                mergeLock.wait(MAX_MERGE_SEND_MILLS);
            } catch (InterruptedException e) {
            }
        }
        isSending = true;
        basketMap.forEach((address, basket) -> {
            if (basket.isEmpty()) {
                return;
            }

            MergedWarpMessage mergeMessage = new MergedWarpMessage();
            while (!basket.isEmpty()) {
                RpcMessage msg = basket.poll();
                mergeMessage.msgs.add((AbstractMessage) msg.getBody());
                mergeMessage.msgIds.add(msg.getId());
            }
            if (mergeMessage.msgIds.size() > 1) {
                printMergeMessageLog(mergeMessage);
            }
            Channel sendChannel = null;
            try {
                // send batch message is sync request, but there is no need to get the return value.
                // Since the messageFuture has been created before the message is placed in basketMap,
                // the return value will be obtained in ClientOnResponseProcessor.
                sendChannel = clientChannelManager.acquireChannel(address);
                AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
            } catch (FrameworkException e) {
                if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
                    destroyChannel(address, sendChannel);
                }
                // fast fail
                for (Integer msgId : mergeMessage.msgIds) {
                    MessageFuture messageFuture = futures.remove(msgId);
                    Integer parentId = childToParentMap.remove(msgId);
                    if (parentId != null) {
                        mergeMsgMap.remove(parentId);
                    }
                    if (messageFuture != null) {
                        messageFuture.setResultMessage(
                            new RuntimeException(String.format("%s is unreachable", address), e));
                    }
                }
                LOGGER.error("client merge call failed: {}", e.getMessage(), e);
            }
        });
        isSending = false;
    }
}

这个任务类的run方法的try里边就是真正发送请求的方法AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);

@Override
public void sendAsyncRequest(Channel channel, Object msg) {
    if (channel == null) {
        LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");
        return;
    }
    RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage
                                                ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
                                                : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
    Object body = rpcMessage.getBody();
    if (body instanceof MergeMessage) {
        Integer parentId = rpcMessage.getId();
        mergeMsgMap.put(parentId, (MergeMessage)rpcMessage.getBody());
        if (body instanceof MergedWarpMessage) {
            for (Integer msgId : ((MergedWarpMessage)rpcMessage.getBody()).msgIds) {
                childToParentMap.put(msgId, parentId);
            }
        }
    }
    super.sendAsync(channel, rpcMessage);
}

扫码关注

最后更新时间: 2026/1/17 13:09:12