分布式事务一直是微服务,分布式应用的难点,还好阿里发布的了不少开源项目,对小羊的云商这样的分布式应用来说帮助非常大。其中dubbo/nacos/seata都是来源于阿里的开源项目,这里要感谢阿里对社会的贡献了。
小羊云商是依赖spring-cloud-alibaba来建设,方便于兼容spring cloud的基础组件,其中最大的差别是在于将内部远程调用方式是采用dubbo的RPC调用,而不是Spring cloud feign client风格的REST调用。
本文涉及软件环境如下:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>2.7.4.1</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.1.4</version>
</dependency>
|
目前dubbo最新的版本为2.7.5, 到这边文章写出来为止, 官网还不建议将2.7.5用于生产环境,2.7.5有一个类似spring cloud一样的服务内省的功能还处于BETA版, 目前dubbo向注册中心nacos注册的单位还是service接口级别,性能还稍微差一些, dubbo下个版本将会支持应用级别。
在Idea中看到的依赖包如下:
为了简化流程,我们只需要创建同一张表t_trans_test, 分布存在于不同的服务中,每个服务单独对应一个数据库。然后在web层调用其中一个服务shop-serivice,再由这个服务远程调用其他4个微服务。服务列表如下:
为了简单,这个业务表只是简单包含以下几个字段:
--测试业务表
CREATETABLE`t_trans_test` (
`id`int(11)NOTNULLCOMMENT'主键',
`name`varchar(255)DEFAULTNULLCOMMENT'名字',
`rec_date` datetimeDEFAULTNULLCOMMENT'记录时间',
PRIMARYKEY(`id`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8 COMMENT='seata分布式事务测试表';
--事务回滚表
CREATETABLE`undo_log` (
`id`bigint(20)NOTNULLAUTO_INCREMENT,
`branch_id`bigint(20)NOTNULL,
`xid`varchar(100)NOTNULL,
`context`varchar(128)NOTNULL,
`rollback_info` longblobNOTNULL,
`log_status`int(11)NOTNULL,
`log_created` datetimeNOTNULL,
`log_modified` datetimeNOTNULL,
PRIMARYKEY(`id`),
UNIQUEKEY`ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=131DEFAULTCHARSET=utf8;
|
业务流程解析:
以保存一条记录为例,
用户通过访问web层,web层通过远程RPC调用shop-service的接口,存入一条记录,同时shop-service也会远程调用其他四个微服务, 如果5个微服务都同时成功那就会一起提交到数据库, 如果其中一个服务出现问题,则会出现全部回滚的现象。
seata的安装请参考seata官网或者其他文章,seata跟其他框架的整合请看: https://github.com/seata/seata-samples
由于seata的默认配置是采用文件的方式,在spring boot的环境下无法支持多个环境的配置。 因此需要引入seata-spring-boot-starter包。
seata-spring-boot-starter是使用springboot自动装配来简化seata-all的复杂配置。1.0.0可用于替换seata-all,
GlobalTransactionScanner自动初始化(依赖SpringUtils)若其他途径实现GlobalTransactionScanner初始化,请保证io.seata.spring.boot.autoconfigure.util.SpringUtils先初始化;
seata-spring-boot-starter默认开启数据源自动代理,用户若再手动配置DataSourceProxy将会导致异常。由于legendDao支持分布式的ID生产策略,因为ID每次生成就不能再回滚,因此不能使用默认的数据源开启代理,需要手动设置数据源代理,要将ID库排除在数据源代理之外。
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
|
spring boot的配置如下
server:
port:8181
spring:
application:
name:@artifactId@
cloud:
nacos:
config:
server-addr: ${NACOS-HOST:192.168.0.146}:${NACOS-PORT:8848}
shared-dataids:application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
file-extension:yml
profiles:
active:@profiles.active@
|
通过配置文件可见,application.yml的配置将会配置在nacos配置中心之中,那单独看一下seata在nacos中的配置:
seata:
enabled:true
application-id:${spring.application.name}
#事务群组(可以每个应用独立取名,也可以使用相同的名字)
tx-service-group:my_test_tx_group
client:
rm-report-success-enable:true
rm-table-meta-check-enable:false# 自动刷新缓存中的表结构(默认false)
rm-report-retry-count:5# 一阶段结果上报TC重试次数(默认5)
rm-async-commit-buffer-limit:10000# 异步提交缓存队列长度(默认10000)
rm:
lock:
lock-retry-internal:10# 校验或占用全局锁重试间隔(默认10ms)
lock-retry-times: 30# 校验或占用全局锁重试次数(默认30)
lock-retry-policy-branch-rollback-on-conflict:true# 分支事务与其它全局回滚事务冲突时锁策略
tm-commit-retry-count: 3# 一阶段全局提交结果上报TC重试次数(默认1次,建议大于1)
tm-rollback-retry-count:3# 一阶段全局回滚结果上报TC重试次数(默认1次,建议大于1)
undo:
undo-data-validation:true# 二阶段回滚镜像校验(默认true开启)
undo-log-serialization:jackson# undo序列化方式(默认jackson)
undo-log-table:undo_log # 自定义undo表名(默认undo_log)
log:
exceptionRate:100# 日志异常输出概率(默认100)
support:
spring:
datasource-autoproxy:false #为了照顾idDatasource不用DataSourceProxy,因此需要配置为false
service:
vgroup-mapping:default# TC 集群(必须与seata-server保持一致)
enable-degrade:false# 降级开关
disable-global-transaction:false #禁用全局事务(默认false)
grouplist:${SEATA-HOST:172.18.102.181}:${SEATA-PORT:8091}
transport:
shutdown:
wait:3
thread-factory:
boss-thread-prefix:NettyBoss
worker-thread-prefix:NettyServerNIOWorker
server-executor-thread-prefix:NettyServerBizHandler
share-boss-worker:false
client-selector-thread-prefix:NettyClientSelector
client-selector-thread-size:1
client-worker-thread-prefix:NettyClientWorkerThread
type:TCP
server:NIO
heartbeat:true
serialization:seata
compressor:none
enable-client-batch-send-request:true# 客户端事务消息请求是否批量合并发送(默认true)
registry:
file:
name:file.conf
type:nacos
nacos:
server-addr:${NACOS-HOST:172.18.102.181}:${NACOS-PORT:8848}
namespace:369355a6-eb56-4864-bee9-75f4bd4592be
cluster:default
config:
file:
name:file.conf
type:file
nacos:
namespace:369355a6-eb56-4864-bee9-75f4bd4592be
server-addr:${NACOS-HOST:172.18.102.181}:${NACOS-PORT:8848}
|
NACOS-HOST的地址是由启动命令中传入,在nacos中也指明的默认的地址为172.18.102.181
nacos的namespace是为了将配置分离到不同的命令空间,让他更容易找。registry模式是采用nacos方式, config配置方式可以采用本地文件的方式,本地文件file.conf如下,需要放在类的上下文中。
# seata 本地的配置,不可缺少
transport{
# tcp udt unix-domain-socket
type ="TCP"
#NIO NATIVE
server ="NIO"
#enable heartbeat
heartbeat =true
# the client batch send request enable
enable-client-batch-send-request =true
#thread factory for netty
thread-factory{
boss-thread-prefix ="NettyBoss"
worker-thread-prefix ="NettyServerNIOWorker"
server-executor-thread-prefix ="NettyServerBizHandler"
share-boss-worker =false
client-selector-thread-prefix ="NettyClientSelector"
client-selector-thread-size =1
client-worker-thread-prefix ="NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size =1
#auto default pin or 8
worker-thread-size =8
}
shutdown{
# when destroy server, wait seconds
wait =3
}
serialization ="seata"
compressor ="none"
}
service{
#transaction service group mapping
vgroup_mapping.my_test_tx_group ="default"
#only support when registry.type=file, please don't set multiple addresses
# default.grouplist = "192.168.0.146:8091" 已经在nacos中配置,无需再配置
#degrade, current not support
enableDegrade =false
#disable seata
disableGlobalTransaction =false
}
client{
rm{
async.commit.buffer.limit =10000
lock{
retry.internal =10
retry.times =30
retry.policy.branch-rollback-on-conflict =true
}
report.retry.count =5
table.meta.check.enable =false
report.success.enable =true
}
tm{
commit.retry.count =5
rollback.retry.count =5
}
undo{
data.validation =true
log.serialization ="jackson"
log.table ="undo_log"
}
log{
exceptionRate =100
}
support{
# auto proxy the DataSource bean
spring.datasource.autoproxy =false
}
}
|
Entity实体类如下:
/**
* 测试seata的分布式事务.演示不需要加入@Column的方式
*/
@Entity
@Table(name ="t_trans_test")
@Data
publicclassTransTestimplementsGenericEntity<Long> {
@Id
@GeneratedValue(strategy = GenerationType.TABLE, generator ="generator")
@TableGenerator(name ="generator", pkColumnValue ="TRANS_TEST_SEQ")
Long id;
/** 名称 */
String name;
/** 连接地址 */
Date recDate;
}
|
Controller如下:
/**
* seata分布式事务测试Dao,测试分布式事务,同时调用其他5个服务
*/
@RestController
@Slf4j
@RequestMapping(value="/transTest")
publicclassTransTestController {
@ShopReference
privateShopTransAllTestService shopTransAllTestService;
/**
* 保存
*/
@RequestMapping(value="/saveTransTest/{name}")
publicLong saveTransTest(@PathVariableString name){
TransTest transTest =newTransTest();
transTest.setName(name);
transTest.setRecDate(newDate());
log.info("saveTransTest {}", transTest);
returnshopTransAllTestService.saveTransTest(transTest);
}
}
|
ShopTransAllTestService的实现代码如下:
@GlobalTransactional(timeoutMills =300000)
publicLong saveTransTest(TransTest transTest) {
System.out.println("Task 开始全局事务,XID = "+ RootContext.getXID());
log.info("saveTransTest {}", transTest);
Long result = 0l;
result += shopTransTestService.saveTransTest(transTest);
result += prodTransTestService.saveTransTest(transTest);
result += orderTransTestService.saveTransTest(transTest);
result += logTransTestService.saveTransTest(transTest);
result += basicTransTestService.saveTransTest(transTest);
returnresult;
}
|
GlobalTransactional用于标记启动全局事务。
其中一个服务BasicTransTestService的代码如下:
@Override
publicLong saveTransTest(TransTest transTest) {
System.out.println("Basic 全局事务id :"+ RootContext.getXID());
returntransTestDao.saveTransTest(transTest);
}
|
TransTestDao实现类
@Override
publicLong saveTransTest(TransTest transTest) {
String suffix ="-basic";
if(transTest.getName() !=null&& !transTest.getName().endsWith(suffix)){
transTest.setName(transTest.getName() + suffix);
}
Long result = save(transTest);
if(true){
thrownewBusinessException("some error happened!");
}
returnresult;
}
|
if(true)语句专门构造一个运行时异常。
最后不要忘了做数据源代理
<!-- 数据源定义 -->
<bean id="shopDataSource"parent="publicDataSource">
<property name="url"value="${shop.jdbc.url}"/>
<property name="username"value="${shop.jdbc.username}"/>
<property name="password"value="${shop.jdbc.password}"/>
</bean>
<!-- 注册所有的数据源 -->
<bean id="dataSource"class="com.legendshop.dao.shards.datasource.DataSourceHolder">
<property name="dataSourceMap">
<map>
<!-- 主键数据源 -->
<entry key="idDataSource"value-ref="idDataSource"></entry>
<!-- 默认数据源 -->
<entry key="dataSource"value-ref="shopDataSourceProxy"></entry>
<entry key="shopDataSource"value-ref="shopDataSourceProxy"></entry>
</map>
</property>
</bean>
<bean id="shopDataSourceProxy"class="io.seata.rm.datasource.DataSourceProxy">
<constructor-arg index="0"ref="shopDataSource"/>
</bean>
|
可见ID库并没有做数据源的代理,是不会参与到事务回滚当中, 其余数据源必须要用io.seata.rm.datasource.DataSourceProxy代理,否则无法回滚。