spring boot学习系列之整合Atomikos多数据源和分布式事务5

in Java with 0 comment

Atomikos是开源的分布式事务管理器,是JTA规范的实现,支持XA协议。现在要将Atomikos整合进springboot。

Atomikos依赖

在pom.xml添加atomikos依赖。

<!– spring jdbc –>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!– JTA –>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>

添加配置

修改application.properties添加多个数据源配置

# 主数据源
mysql.datasource.one.url = jdbc:mysql://127.0.0.1:3306/wordpress?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
mysql.datasource.one.username =root
mysql.datasource.one.password =123456
mysql.datasource.one.minPoolSize =3
mysql.datasource.one.maxPoolSize =25
mysql.datasource.one.maxLifetime =20000
mysql.datasource.one.borrowConnectionTimeout =30
mysql.datasource.one.loginTimeout =30
mysql.datasource.one.maintenanceInterval =60
mysql.datasource.one.maxIdleTime =60
mysql.datasource.one.testQuery =select 1
# 数据源 2
mysql.datasource.two.url =jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
mysql.datasource.two.username =root
mysql.datasource.two.password =123456
mysql.datasource.two.minPoolSize =3
mysql.datasource.two.maxPoolSize =25
mysql.datasource.two.maxLifetime =20000
mysql.datasource.two.borrowConnectionTimeout =30
mysql.datasource.two.loginTimeout =30
mysql.datasource.two.maintenanceInterval =60
mysql.datasource.two.maxIdleTime =60
mysql.datasource.two.testQuery =select 1

加载配置类

这里定义了两个数据源

定义主数据源配置类,加载参数

/**
 * 
* @ClassName: OneDbProperties
* @Description: 主数据源配置
* @author wenqy
* @date 2017年7月26日 下午9:56:09
*
 */
@ConfigurationProperties(prefix = “mysql.datasource.one”)
public class OneDbProperties {
    private String url;
    private String username;
    private String password;
    /** min-pool-size 最小连接数 **/
    private int minPoolSize;
    /** max-pool-size 最大连接数 **/
    private int maxPoolSize;
    /** max-lifetime 连接最大存活时间 **/
    private int maxLifetime;
    /** borrow-connection-timeout 获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回 **/
    private int borrowConnectionTimeout;
    /** login-timeout java数据库连接池,最大可等待获取datasouce的时间 **/
    private int loginTimeout;
    /** maintenance-interval 连接回收时间 **/
    private int maintenanceInterval;
    /** max-idle-time 最大闲置时间,超过最小连接池连接的连接将将关闭 **/
    private int maxIdleTime;
    /** test-query 测试SQL **/
    private String testQuery;
// 省略getter 和 setter 方法
}

定义其他数据源配置类,加载参数

/**
 * 
* @ClassName: OneDbProperties
* @Description: 数据源配置
* @author wenqy
* @date 2017年7月26日 下午9:56:09
*
 */
@ConfigurationProperties(prefix = “mysql.datasource.two”)
public class TwoDbProperties {
    private String url;
    private String username;
    private String password;
    /** min-pool-size 最小连接数 **/
    private int minPoolSize;
    /** max-pool-size 最大连接数 **/
    private int maxPoolSize;
    /** max-lifetime 连接最大存活时间 **/
    private int maxLifetime;
    /** borrow-connection-timeout 获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回 **/
    private int borrowConnectionTimeout;
    /** login-timeout java数据库连接池,最大可等待获取datasouce的时间 **/
    private int loginTimeout;
    /** maintenance-interval 连接回收时间 **/
    private int maintenanceInterval;
    /** max-idle-time 最大闲置时间,超过最小连接池连接的连接将将关闭 **/
    private int maxIdleTime;
    /** test-query 测试SQL **/
    private String testQuery;
// 省略getter 和 setter 方法
}

主数据源加载, 注入Atomikos的数据源Bean

@Configuration
@MapperScan(basePackages = “com.wenqy.mapper.one”, sqlSessionTemplateRef = “oneSqlSessionTemplate”)
public class OneDbConfig {
    /**
     * @throws SQLException 
    * @Title: oneDataSource 
    * @Description: 主数据源 
    * @param @param oneDbProperties
    * @return DataSource    数据源
    * @throws
     */
    @Primary
    @Bean(name = “oneDataSource”)
    public DataSource oneDataSource(OneDbProperties oneDbProperties) throws SQLException {
        MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
        mysqlXADataSource.setUrl(oneDbProperties.getUrl());
        mysqlXADataSource.setUser(oneDbProperties.getUsername());
        mysqlXADataSource.setPassword(oneDbProperties.getPassword());
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXADataSource);
        xaDataSource.setUniqueResourceName(“oneDataSource”);
        xaDataSource.setMinPoolSize(oneDbProperties.getMinPoolSize());
        xaDataSource.setMaxPoolSize(oneDbProperties.getMaxPoolSize());
        xaDataSource.setMaxLifetime(oneDbProperties.getMaxLifetime());
        xaDataSource.setBorrowConnectionTimeout(oneDbProperties.getBorrowConnectionTimeout());
        xaDataSource.setLoginTimeout(oneDbProperties.getLoginTimeout());
        xaDataSource.setMaintenanceInterval(oneDbProperties.getMaintenanceInterval());
        xaDataSource.setMaxIdleTime(oneDbProperties.getMaxIdleTime());
        xaDataSource.setTestQuery(oneDbProperties.getTestQuery());
        return xaDataSource;
    }
    @Bean(name = “oneSqlSessionFactory”)
    public SqlSessionFactory oneSqlSessionFactory(@Qualifier(“oneDataSource”) DataSource dataSource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        bean.setMapperLocations(
                new PathMatchingResourcePatternResolver().getResources(“classpath:/mapper/config/one/*.xml”));
        return bean.getObject();
    }
    @Bean(name = “oneSqlSessionTemplate”)
    public SqlSessionTemplate oneSqlSessionTemplate(
            @Qualifier(“oneSqlSessionFactory”) SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

其他数据源加载,注入Atomikos的数据源Bean

@Configuration
@MapperScan(basePackages = “com.wenqy.mapper.two”, sqlSessionTemplateRef = “twoSqlSessionTemplate”)
public class TwoDbConfig {
    @Bean(name = “twoDataSource”)
    public DataSource twoDataSource(TwoDbProperties twoDbProperties) throws SQLException {
        MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
        mysqlXaDataSource.setUrl(twoDbProperties.getUrl());
        mysqlXaDataSource.setPassword(twoDbProperties.getPassword());
        mysqlXaDataSource.setUser(twoDbProperties.getUsername());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXaDataSource);
        xaDataSource.setUniqueResourceName(“twoDataSource”);
        xaDataSource.setMinPoolSize(twoDbProperties.getMinPoolSize());
        xaDataSource.setMaxPoolSize(twoDbProperties.getMaxPoolSize());
        xaDataSource.setMaxLifetime(twoDbProperties.getMaxLifetime());
        xaDataSource.setBorrowConnectionTimeout(twoDbProperties.getBorrowConnectionTimeout());
        xaDataSource.setLoginTimeout(twoDbProperties.getLoginTimeout());
        xaDataSource.setMaintenanceInterval(twoDbProperties.getMaintenanceInterval());
        xaDataSource.setMaxIdleTime(twoDbProperties.getMaxIdleTime());
        xaDataSource.setTestQuery(twoDbProperties.getTestQuery());
        return xaDataSource;
    }
    @Bean(name = “twoSqlSessionFactory”)
    public SqlSessionFactory twoSqlSessionFactory(@Qualifier(“twoDataSource”) DataSource dataSource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        bean.setMapperLocations(
                new PathMatchingResourcePatternResolver().getResources(“classpath:/mapper/config/two/*.xml”));
        return bean.getObject();
    }
    @Bean(name = “twoSqlSessionTemplate”)
    public SqlSessionTemplate twoSqlSessionTemplate(
            @Qualifier(“twoSqlSessionFactory”) SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

事务配置

注入Atomikos自己的事务管理器

/**
 * 
* @ClassName: TransactionManagerConfig
* @Description: 事务管理配置
* @author
*
 */
@Configuration
@EnableTransactionManagement
public class TransactionManagerConfig {
    /** 
     * 自定义事务 
     * MyBatis自动参与到spring事务管理中,无需额外配置,
     * 只要org.mybatis.spring.SqlSessionFactoryBean
     * 引用的数据源与DataSourceTransactionManager引用的数据源一致即可,否则事务管理会不起作用。
     */
    @Bean(name = “userTransaction”)
    public UserTransaction userTransaction() throws Throwable {
        UserTransactionImp userTransactionImp = new UserTransactionImp();
        userTransactionImp.setTransactionTimeout(10000);
        return userTransactionImp;
    }
    @Bean(name = “atomikosTransactionManager”, initMethod = “init”, destroyMethod = “close”)
    public TransactionManager atomikosTransactionManager() throws Throwable {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(false);
        return userTransactionManager;
    }
    @Bean(name = “transactionManager”)
    @DependsOn({ “userTransaction”, “atomikosTransactionManager” })
    public PlatformTransactionManager transactionManager() throws Throwable {
        UserTransaction userTransaction = userTransaction();
        JtaTransactionManager manager = new JtaTransactionManager(userTransaction, atomikosTransactionManager());
        return manager;
    }
}

单元测试

编写测试类,测试服务。

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class TestMutiDataSource {
    @Autowired
    private UserService userService;
    @Test
    public void testInsertData() throws Exception {
        userService.insertData();
    }
}

Service实现类,注释了要抛出异常错误

@Transactional(value=“transactionManager”)
    public void insertData() throws Exception {
        User user = new User();
        user.setId(new Random().nextInt());
        user.setName(“wenqy”);
        userMapper.saveUser( user );
//      if (true) {
//          throw new RuntimeException(“insert failure”);
//      }
        Person person = new Person();
        person.setId(new Random().nextInt());
        person.setEmail(“wen.qy@qq.com”);
        personMapper.savePerson( person );
//      if (true) {
//          throw new RuntimeException(“insert failure”);
//      }
    }

结果两表都插入数据

multidatasource_atomikos_correct

去掉第一个注释块,抛出运行时异常,执行测试,两表没插入数据,正确。恢复第一个注释块,去掉第一个注释块,抛出运行时异常,执行测试,两表没插入数据,正确。更换异常为检查异常,如,SQLException时,发现,两表都插入数据了,没有发生回滚,错误。

   @Transactional(value=“transactionManager”)
    public void insertData() throws Exception {
        User user = new User();
        user.setId(new Random().nextInt());
        user.setName(“wenqy”);
        userMapper.saveUser( user );
//      if (true) {
//          throw new RuntimeException(“insert failure”);
//      }
        Person person = new Person();
        person.setId(new Random().nextInt());
        person.setEmail(“wen.qy@qq.com”);
        personMapper.savePerson( person );
        if (true) {
            throw new SQLException(“insert failure”);
        }
    }

指定事务需要回滚所抛出的异常,这里是Exception

@Transactional(value=“transactionManager”,rollbackFor=Exception.class)
    public void insertDataRollbackForException() throws Exception {
        User user = new User();
        user.setId(new Random().nextInt());
        user.setName(“wenqy”);
        userMapper.saveUser( user );
//      if (true) {
//          throw new RuntimeException(“insert failure”);
//      }
        Person person = new Person();
        person.setId(new Random().nextInt());
        person.setEmail(“wen.qy@qq.com”);
        personMapper.savePerson( person );
        if (true) {
            throw new SQLException(“insert failure”);
        }
    }

然后继续测试,执行测试,两表没插入数据,说明事务回滚正确

multidatasource_atomikos_correct_err

结论

默认配置下,Spring只有在抛出运行时异常,即RuntimeException及其子类(Errors也会导致事务回滚)时,才回滚该事务。Atomikos分布式事务和Spring声明式事务一样,对业务透明,非侵入式的。Atomikos支持与Spring的无缝衔接。而Spring默认的事务隔离级别是数据库默认事务。需要数据库支持事务,如,MySQL不能使用MYISAM引擎。

参考:

atomikos官网:https://www.atomikos.com/