前言
本次在做分库分表时,遇到了跨库事务的问题。通过在网上搜索了许多资料后找了一些解决该问题的方法与思路。本文将分为两部分分别介绍常用的分布式事务解决方案以及本次分库分表中如何去解决跨库事务的实践。
常用的分布式事务解决方案
2PC(two-phase commit)
2PC基本介绍
2PC即为两阶段提交,是一种在多节点间实现事务原子提交的算法,用来确保所有节点要么全部提交,要么全部中止。我们可以有多种方案来实现该算法,如基于XA协议的实现,阿里也提供了Seata中间件支持2PC算法。
根据图示可以发现我们引入了一个新组件叫做「协调者」(也称事务管理者),他的作用是协调所有参与者的提交与回滚操作。
2PC既然叫做两阶段提交,那必然是分成了两个阶段。
准备阶段: 协调者会在准备阶段给所有参与者都发送准备命令。如果参与者发现准备命令无法执行或者执行失败时,可以返回失败,如果执行完成则保存事务日志并返回成功。针对于数据库的操作,准备阶段会记录redolog以及undolog为后续的提交做准备。需要注意的是在准备阶段时,数据实际是没有被真正保存的。
提交阶段: 协调者在提交阶段会根据准备阶段各个参与者的返回结果,判断是执行事务还是回滚事务并向所有参与者发起提交命令。其中只有当准备阶段的所有节点都返回成功,协调者才会发送执行事务的命令。如果有一个参与者返回失败,那么协调者就会向所有参与者发送回滚事务的请求,即分布式事务执行失败。
2PC的优缺点
优点:
- a. 保证了事务是强一致性的
- b. 实现相对简单
缺陷:
- a. 同步阻塞问题,如果其中一个参与者响应超时,其他所有参与者都需要等待。
- b. 增加死锁风险,由于每个参与者都可能长时间锁定资源,因此死锁的风险大大加大了
- c. 单点故障问题,如果协调者出现问题。其他所有参与者将无法判断应该提交事务或回滚
2PC的简单实现
以下为2PC的一种简单实现方法的伪代码。
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MultiTransactional {
/**
* 数据源名称
*/
String[] value() default {};
}
public class TwoPCTransactionAspect {
@Around("@annotation(com.xingren.jooq.multidb.annotation.MultiTransactional)")
public Object startMultiTransaction(ProceedingJoinPoint joinPoint) throws Throwable {
String[] dsNames = ((MethodSignature) joinPoint.getSignature()).getMethod()
.getAnnotation(MultiTransactional.class).value();
Callable callable = () -> joinPoint.proceed();
// 根据传入数据源的顺序构造事务链,在最内层事务调用包含业务逻辑的callable
Callable combined = Stream.of(dsNames).distinct()
.reduce(callable, (c, dsName) -> _startTransaction(c, dsName), (c1, c2) -> c2);
return combined.call();
}
private Callable _startTransaction(Callable callable, String dsName) {
DataSourceTransactionManager transactionManager = getTransactionManager(dsName);
return () -> {
try {
//准备阶段 此时执行的操作都只会记录undolog,redolog
Object result = callable.call();
//提交阶段 如果准备阶段没有参与者有异常,则会执行提交commit
transactionManager.commit(status);
return result;
} catch (Throwable e) {
//提交阶段 如果准备阶段有任何参与者出现异常,都会触发rollback
transactionManager.rollback(status);
throw e;
}
};
}
}
TCC
TCC分别指的是三个步骤,Try - Confirm - Cancel。与2PC类似,它在逻辑也会分成准备和提交两个阶段。但是最大的不同在于TCC需要业务服务自己去实现准备、执行、回滚的代码,因此可以做到非常灵活。
Try: 对资源的预留和锁定
Confirm: 确认操作,是对事务的真正执行。
Cancel 撤销操作,把预留阶段的资源撤销。
从图中其实我们能发现TCC与2PC的操作基本是一致的。只是2PC针对于数据库操作,TCC可以让服务方自己去实现相应操作。
TCC优缺点
优点:
- a. 实现很灵活,几乎可以满足任何分布式事务的场景
缺点:
- a. 与业务代码耦合度高,开发复杂度大大提升
- b. confirm,cancel操作需要考虑幂等性
- c. 其实实现思想还是依据2PC,因此包含了2PC所有缺点
TCC的简单实现
TCC的内部实现还是比较复杂的,因此可以引入一些专门的TCC框架,如ByteTCC,Himly等。
此处的实现仅仅是基于TCC思想的一种伪代码简单实现:
public class TCCService {
public void invoke() {
try {
// try
serviceA.doTry();
serviceB.doTry();
// confirm
serviceA.doConfirm();
serviceB.doConfirm();
} catch (Exception e) {
// cancel
serviceA.doCancel();
serviceA.doCancel();
}
}
}
public class ServiceA implements TCCSampleInterface {
@Override
public void doTry() {
// 服务A的try操作
}
@Override
public void doConfirm() {
// 服务A的confirm操作 注意幂等!!
}
@Override
public void doCancel() {
// 服务A的cancel操作 注意幂等!!
}
}
public class ServiceB implements TCCSampleInterface {
@Override
public void doTry() {
// 服务B的try操作
}
@Override
public void doConfirm() {
// 服务B的confirm操作 注意幂等!!
}
@Override
public void doCancel() {
// 服务B的cancel操作 注意幂等!!
}
}
最大努力通知——本地消息表
最大努力通知顾名思义就是尽可能的保证事务的最终一致性,是一种柔性事务的思想。他有许多种实现方式,本文只讲解其中一种:本地消息表
本地消息表其实就是使用了一张额外的表保存了操作的日志。
根据上图,本地消息表的实现方式会在第一个操作完成之后记录事务日志。需要注意的是记录事务日志与第一个操作请求是在一个事务中的,这样保证了日志记录在操作成功后一定会记下。后续的操作如果更新成功了,则更新日志记录表为成功,如果失败了则更新日志表为失败。后台会有定时任务定期去扫描事务日志表,对失败的操作进行回放,以此来保证事务的最终一致性。
最大努力通知优缺点
优点:
- a. 不会长时间锁住资源,减少死锁风险
- b. 没有长事务,执行效率高,提高吞吐量
缺点:
- a. 事务不是强一致性,而是最终一致性
适用场景
分布式事务解决方式 | 适用场景 |
---|---|
2PC | 数据库层面的分布式事务场景 |
TCC | 跨不同业务系统的分布式事务场景 |
最大努力通知 | 对事务一致性要求不高的场景 |
分库分表中跨库事务的实现
本次分库分表时针对于消息服务做的,接下来将以消息服务中最经典的一个操作,「发送消息」场景进行举例。
发送消息的流程主要分为四步:
- 发起发送消息的请求
- 保存消息至消息表(消息表在db1的数据库)
- 会话表中该会话消息数量+1(会话表在db2的数据库)
- 会话发送成功
发送消息这个场景的特点在于:
- 读写操作频繁,对性能要求高(如发送消息等场景)
- 对事务一致性要求相对不那么高(相对于金融系统的事务要求强一致性,会话的消息总数更新不及时是可以接受的)
因此采用了本地消息表来实现事务的最终一致性。
具体实现流程图如下:
代码实现
为方便理解,代码有所修改。
注解类
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TransactionLog {
/**
* 参数类型
*/
Class[] parameterTypes();
/**
* 泛型参数类型(与参数类型一一对应)
*/
Class[] genericTypes();
}
AOP增强
@Aspect
@Component
@SuppressWarnings({"checkstyle:magicnumber"})
@Slf4j
public class TransactionLogAspect {
@Autowired
private MessageTransactionLogRepository messageTransactionLogRepo;
@Around("@annotation(com.xingren.message.infrastructure.aop.annotation.TransactionLog) && @annotation(transactionLog)")
public void around(ProceedingJoinPoint pjp, TransactionLog transactionLog) throws Throwable {
// 基础数据
String beanName = StringUtil.toFirstWordLowerCase(pjp.getSignature().getDeclaringType().getSimpleName());
String methodName = pjp.getSignature().getName();
Class[] parameterTypes = transactionLog.parameterTypes();
Class[] genericTypes = transactionLog.genericTypes();
Object[] parameterValues = pjp.getArgs();
// 记录操作日志
Long logId = logRepo.insertLog(beanName, methodName, parameterTypes, genericTypes, parameterValues);
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
proceed(pjp, logRepo, logId);
}
});
}
private void proceed(ProceedingJoinPoint pjp, IMessageTransactionLogRepository logRepo, Long logId) {
try {
// 执行操作
pjp.proceed();
// 执行成功,删除日志(防止日志过多)
logRepo.deleteLog(logId);
} catch (Throwable e) {
log.error("[TransactionLogAspect] 业务代码出现异常", e);
try {
// 执行失败,更新日志记录表为失败
logRepo.updateLogStatus(logId, TransactionLogStatus.FAIL, Boolean.FALSE);
} catch (Exception ex) {
log.error("[TransactionLogAspect] 更新记录出现异常", ex);
}
}
}
}
实际使用
/**
* 发送消息
**/
@Transcational("db1") //使用db1的事务
@Override
public void sendMessages(Session session) {
// 保存消息
List<Long> sendSuccessMessageIds = messageRepo.batchInsertOrIgnore(MessageCreateConverter.of(session.getMessages()));
// 获取消息数量
List<Message> messages= session.getMessages().stream().filter(message-> sendSuccessMessageIds.contains(message.getMessageId())).collect(toList());
// 更新消息数量
sessionRepo.updateMessageCountWithTransactionLog(session.get SessionId(), messages.size());
}
/**
* 更新会话的消息数量
**/
@TransactionLog(parameterTypes = {Long.class, Integer.class}, genericTypes = {Object.class, Object.class})
public void updateMessageCountWithTransactionLog(Long sessionId, Integer count) {
updateMessageCount(sessionId, count);
}
总结
2PC 是强一致性的事务,适合用在数据库层面。
TCC 也是强一致性的事务,适合大部分场景,但是对业务代码有侵入,开发量也很大。
最大努力通知 是一种柔性事务,用各种方式去维持事务的最终一致性。
本文介绍的分库分表事务一致性实现,使用的就是最大努力通知的本地消息表实现。
分布式事务是分布式系统中非常头疼的一个环节,我们需要在不同的场景使用不同的解决方案。但是也不难发现,无论哪一种方式都有自己的利弊,实现都较为复杂,因此我的建议是:能避免使用分布式事务,就避免。
参考资料
《数据密集型应用系统设计》
Q.E.D.