Atomikos是开源的分布式事务管理器,是JTA规范的实现,支持XA协议。现在要将Atomikos整合进springboot。
Atomikos依赖
在pom.xml添加atomikos依赖。
<!– spring jdbc –>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!– JTA –>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
添加配置
修改application.properties添加多个数据源配置
# 主数据源
mysql.datasource.one.url = jdbc:mysql://127.0.0.1:3306/wordpress?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
mysql.datasource.one.username =root
mysql.datasource.one.password =123456
mysql.datasource.one.minPoolSize =3
mysql.datasource.one.maxPoolSize =25
mysql.datasource.one.maxLifetime =20000
mysql.datasource.one.borrowConnectionTimeout =30
mysql.datasource.one.loginTimeout =30
mysql.datasource.one.maintenanceInterval =60
mysql.datasource.one.maxIdleTime =60
mysql.datasource.one.testQuery =select 1
# 数据源 2
mysql.datasource.two.url =jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
mysql.datasource.two.username =root
mysql.datasource.two.password =123456
mysql.datasource.two.minPoolSize =3
mysql.datasource.two.maxPoolSize =25
mysql.datasource.two.maxLifetime =20000
mysql.datasource.two.borrowConnectionTimeout =30
mysql.datasource.two.loginTimeout =30
mysql.datasource.two.maintenanceInterval =60
mysql.datasource.two.maxIdleTime =60
mysql.datasource.two.testQuery =select 1
加载配置类
这里定义了两个数据源
定义主数据源配置类,加载参数
/**
*
* @ClassName: OneDbProperties
* @Description: 主数据源配置
* @author wenqy
* @date 2017年7月26日 下午9:56:09
*
*/
@ConfigurationProperties(prefix = “mysql.datasource.one”)
public class OneDbProperties {
private String url;
private String username;
private String password;
/** min-pool-size 最小连接数 **/
private int minPoolSize;
/** max-pool-size 最大连接数 **/
private int maxPoolSize;
/** max-lifetime 连接最大存活时间 **/
private int maxLifetime;
/** borrow-connection-timeout 获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回 **/
private int borrowConnectionTimeout;
/** login-timeout java数据库连接池,最大可等待获取datasouce的时间 **/
private int loginTimeout;
/** maintenance-interval 连接回收时间 **/
private int maintenanceInterval;
/** max-idle-time 最大闲置时间,超过最小连接池连接的连接将将关闭 **/
private int maxIdleTime;
/** test-query 测试SQL **/
private String testQuery;
// 省略getter 和 setter 方法
}
定义其他数据源配置类,加载参数
/**
*
* @ClassName: OneDbProperties
* @Description: 数据源配置
* @author wenqy
* @date 2017年7月26日 下午9:56:09
*
*/
@ConfigurationProperties(prefix = “mysql.datasource.two”)
public class TwoDbProperties {
private String url;
private String username;
private String password;
/** min-pool-size 最小连接数 **/
private int minPoolSize;
/** max-pool-size 最大连接数 **/
private int maxPoolSize;
/** max-lifetime 连接最大存活时间 **/
private int maxLifetime;
/** borrow-connection-timeout 获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回 **/
private int borrowConnectionTimeout;
/** login-timeout java数据库连接池,最大可等待获取datasouce的时间 **/
private int loginTimeout;
/** maintenance-interval 连接回收时间 **/
private int maintenanceInterval;
/** max-idle-time 最大闲置时间,超过最小连接池连接的连接将将关闭 **/
private int maxIdleTime;
/** test-query 测试SQL **/
private String testQuery;
// 省略getter 和 setter 方法
}
主数据源加载, 注入Atomikos的数据源Bean
@Configuration
@MapperScan(basePackages = “com.wenqy.mapper.one”, sqlSessionTemplateRef = “oneSqlSessionTemplate”)
public class OneDbConfig {
/**
* @throws SQLException
* @Title: oneDataSource
* @Description: 主数据源
* @param @param oneDbProperties
* @return DataSource 数据源
* @throws
*/
@Primary
@Bean(name = “oneDataSource”)
public DataSource oneDataSource(OneDbProperties oneDbProperties) throws SQLException {
MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
mysqlXADataSource.setUrl(oneDbProperties.getUrl());
mysqlXADataSource.setUser(oneDbProperties.getUsername());
mysqlXADataSource.setPassword(oneDbProperties.getPassword());
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXADataSource);
xaDataSource.setUniqueResourceName(“oneDataSource”);
xaDataSource.setMinPoolSize(oneDbProperties.getMinPoolSize());
xaDataSource.setMaxPoolSize(oneDbProperties.getMaxPoolSize());
xaDataSource.setMaxLifetime(oneDbProperties.getMaxLifetime());
xaDataSource.setBorrowConnectionTimeout(oneDbProperties.getBorrowConnectionTimeout());
xaDataSource.setLoginTimeout(oneDbProperties.getLoginTimeout());
xaDataSource.setMaintenanceInterval(oneDbProperties.getMaintenanceInterval());
xaDataSource.setMaxIdleTime(oneDbProperties.getMaxIdleTime());
xaDataSource.setTestQuery(oneDbProperties.getTestQuery());
return xaDataSource;
}
@Bean(name = “oneSqlSessionFactory”)
public SqlSessionFactory oneSqlSessionFactory(@Qualifier(“oneDataSource”) DataSource dataSource)
throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
bean.setMapperLocations(
new PathMatchingResourcePatternResolver().getResources(“classpath:/mapper/config/one/*.xml”));
return bean.getObject();
}
@Bean(name = “oneSqlSessionTemplate”)
public SqlSessionTemplate oneSqlSessionTemplate(
@Qualifier(“oneSqlSessionFactory”) SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
其他数据源加载,注入Atomikos的数据源Bean
@Configuration
@MapperScan(basePackages = “com.wenqy.mapper.two”, sqlSessionTemplateRef = “twoSqlSessionTemplate”)
public class TwoDbConfig {
@Bean(name = “twoDataSource”)
public DataSource twoDataSource(TwoDbProperties twoDbProperties) throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(twoDbProperties.getUrl());
mysqlXaDataSource.setPassword(twoDbProperties.getPassword());
mysqlXaDataSource.setUser(twoDbProperties.getUsername());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName(“twoDataSource”);
xaDataSource.setMinPoolSize(twoDbProperties.getMinPoolSize());
xaDataSource.setMaxPoolSize(twoDbProperties.getMaxPoolSize());
xaDataSource.setMaxLifetime(twoDbProperties.getMaxLifetime());
xaDataSource.setBorrowConnectionTimeout(twoDbProperties.getBorrowConnectionTimeout());
xaDataSource.setLoginTimeout(twoDbProperties.getLoginTimeout());
xaDataSource.setMaintenanceInterval(twoDbProperties.getMaintenanceInterval());
xaDataSource.setMaxIdleTime(twoDbProperties.getMaxIdleTime());
xaDataSource.setTestQuery(twoDbProperties.getTestQuery());
return xaDataSource;
}
@Bean(name = “twoSqlSessionFactory”)
public SqlSessionFactory twoSqlSessionFactory(@Qualifier(“twoDataSource”) DataSource dataSource)
throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
bean.setMapperLocations(
new PathMatchingResourcePatternResolver().getResources(“classpath:/mapper/config/two/*.xml”));
return bean.getObject();
}
@Bean(name = “twoSqlSessionTemplate”)
public SqlSessionTemplate twoSqlSessionTemplate(
@Qualifier(“twoSqlSessionFactory”) SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
事务配置
注入Atomikos自己的事务管理器
/**
*
* @ClassName: TransactionManagerConfig
* @Description: 事务管理配置
* @author
*
*/
@Configuration
@EnableTransactionManagement
public class TransactionManagerConfig {
/**
* 自定义事务
* MyBatis自动参与到spring事务管理中,无需额外配置,
* 只要org.mybatis.spring.SqlSessionFactoryBean
* 引用的数据源与DataSourceTransactionManager引用的数据源一致即可,否则事务管理会不起作用。
*/
@Bean(name = “userTransaction”)
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
@Bean(name = “atomikosTransactionManager”, initMethod = “init”, destroyMethod = “close”)
public TransactionManager atomikosTransactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
@Bean(name = “transactionManager”)
@DependsOn({ “userTransaction”, “atomikosTransactionManager” })
public PlatformTransactionManager transactionManager() throws Throwable {
UserTransaction userTransaction = userTransaction();
JtaTransactionManager manager = new JtaTransactionManager(userTransaction, atomikosTransactionManager());
return manager;
}
}
单元测试
编写测试类,测试服务。
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class TestMutiDataSource {
@Autowired
private UserService userService;
@Test
public void testInsertData() throws Exception {
userService.insertData();
}
}
Service实现类,注释了要抛出异常错误
@Transactional(value=“transactionManager”)
public void insertData() throws Exception {
User user = new User();
user.setId(new Random().nextInt());
user.setName(“wenqy”);
userMapper.saveUser( user );
// if (true) {
// throw new RuntimeException(“insert failure”);
// }
Person person = new Person();
person.setId(new Random().nextInt());
person.setEmail(“wen.qy@qq.com”);
personMapper.savePerson( person );
// if (true) {
// throw new RuntimeException(“insert failure”);
// }
}
结果两表都插入数据
去掉第一个注释块,抛出运行时异常,执行测试,两表没插入数据,正确。恢复第一个注释块,去掉第一个注释块,抛出运行时异常,执行测试,两表没插入数据,正确。更换异常为检查异常,如,SQLException时,发现,两表都插入数据了,没有发生回滚,错误。
@Transactional(value=“transactionManager”)
public void insertData() throws Exception {
User user = new User();
user.setId(new Random().nextInt());
user.setName(“wenqy”);
userMapper.saveUser( user );
// if (true) {
// throw new RuntimeException(“insert failure”);
// }
Person person = new Person();
person.setId(new Random().nextInt());
person.setEmail(“wen.qy@qq.com”);
personMapper.savePerson( person );
if (true) {
throw new SQLException(“insert failure”);
}
}
指定事务需要回滚所抛出的异常,这里是Exception
@Transactional(value=“transactionManager”,rollbackFor=Exception.class)
public void insertDataRollbackForException() throws Exception {
User user = new User();
user.setId(new Random().nextInt());
user.setName(“wenqy”);
userMapper.saveUser( user );
// if (true) {
// throw new RuntimeException(“insert failure”);
// }
Person person = new Person();
person.setId(new Random().nextInt());
person.setEmail(“wen.qy@qq.com”);
personMapper.savePerson( person );
if (true) {
throw new SQLException(“insert failure”);
}
}
然后继续测试,执行测试,两表没插入数据,说明事务回滚正确
结论
默认配置下,Spring只有在抛出运行时异常,即RuntimeException及其子类(Errors也会导致事务回滚)时,才回滚该事务。Atomikos分布式事务和Spring声明式事务一样,对业务透明,非侵入式的。Atomikos支持与Spring的无缝衔接。而Spring默认的事务隔离级别是数据库默认事务。需要数据库支持事务,如,MySQL不能使用MYISAM引擎。
参考:
atomikos官网:https://www.atomikos.com/
本文由 wenqy 创作,采用 知识共享署名4.0
国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Nov 8,2020