SpringBoot+JPA+Atomikos实现分布式事务(前提是在SpringBoot项目中利用SpringDataJPA实现对多数据源操作时,为了避免事务报错)-爱代码爱编程
1.新建一个SpringBoot项目,向pom.xml中添加相关依赖:
<!--配置数据库JPA依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--通过maven坐标引入JTA atomikos-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<!--配置dozer依赖-->
<dependency>
<groupId>com.github.dozermapper</groupId>
<artifactId>dozer-spring-boot-starter</artifactId>
<version>6.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
其中项目中用到的比较重要的是:JTA Atomikos依赖和jpa依赖。
2.在全局配置文件(application)中对数据库进行配置:
server:
port: 8888
spring:
datasource:
primary:
url: jdbc:mysql://127.0.0.1:3306/testdb?useUnicode=true&characterEncoding=utf-8&useSSL=false
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
secondary:
url: jdbc:mysql://127.0.0.1:3306/testdb2?useUnicode=true&characterEncoding=utf-8&useSSL=false
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
jpa: #添加SpringDataJPA配置
database-platform: org.hibernate.dialect.MySQL5InnoDBDialect
hibernate:
ddl-auto: update
database: mysql
show-sql: true
jta: #添加分布式JTA配置
atomikos:
datasource:
max-pool-size: 20
borrow-connection-timeout: 60
connectionfactory:
max-pool-size: 20
borrow-connection-timeout: 60
2.1。几点说明:(1)port为端口号;(2)datasource:为所需要操作的两个数据源,分别是primary和secondary;(3)jpa就是添加所使用到的SpringDataJPA配置,其中比较重要的就是ddl-auto它共有四个参数分别是,《1》 create:表示每次加载hibernate的时候,都会删除上一次生成的表,然后再根据你的modle类生成一个表 《2》create-drop:每次加载hibernate的时候,根据你的modle类生成一个表,当sessionFactory关闭的时候,表就自动删除 《3》update:hibernate根据你的modle类创建表结构,如果该表没有,则会创建,如果存在就进行根新表结构,并不会删除表里面以前存在的数据 《4》validate:每次加载hibernate的时候,都会验证表结构是否和modle类的定义一致,如不不一致,就会抛出异常,不会做出修改数据库的动作;我们为了安全,一般就默认使用validate。其他的都是固定搭配;(4)分布式事务jta atomikos中的一些配置:初始化数据库链接池,包括最大的数据库链接数和最大的超时时间。
3.jpa操作多数据源,并实现分布式事务原理,就是给定一个总的管理类,来进行一个统一的管理,这是我的理解。
在理解并写代码时,可以对着上图。
4,在写业务类时,一般及时模板了。
4.1写第一层Dao层,也就是持久层,专注于对数据的持久化操作,例如将数据存入数据库,磁盘等。
(1)Dao层的一个结构:
(2)对应的代码:
《1》Article
package com.example.jpamultipledatasource.Dao.testdb;
import lombok.Builder;
import lombok.Data;
import javax.persistence.*;
import java.util.Date;
/**
* @author ZEShart
* @create 2021-03-03-12:40
*/
@Data
@Builder
@Entity //表示当前类是一个实体类,并表示接受SpringDataJPA的控制管理
//如果不特定指哪个表,就默认 类名对应的表
//反之,用注解@Table指定对应特定的表
@Table(name = "article")
public class Article {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id; //id是唯一的主键并且是自增的类型是IDENTITY,所以加如上两个注解
@Column(nullable = false,length=32)
private String author;
@Column(nullable = false,unique = true,length=32)
private String title;
@Column(length=512)
private String content;
private Date createTime;
}
《2》对应的 ArticleRepository
package com.example.jpamultipledatasource.Dao.testdb;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* @author ZEShart
* @create 2021-03-26-17:05
*/
public interface ArticleRepository extends JpaRepository<Article,Long> {
//<Article,Long> Article表示要操作的数据库表对应实体PO,Long 是id属性实体主键的类型
//注意这个方法的名称,JPA会根据方法名自动生成SQL语句
Article findByAuthor(String author); //可以所很智能了
}
《3》Message
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
/**
* @author ZEShart
* @create 2021-03-27-20:01
*/
@Data
@Entity
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Message {
@Id
@GeneratedValue
private int id;
@Column(nullable = false)
private String name;
@Column(nullable = false)
private String content;
}
《4》对应的 MessageRepository
package com.example.jpamultipledatasource.Dao.testdb2;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* @author ZEShart
* @create 2021-03-27-20:19
*/
public interface MessageRepository extends JpaRepository<Message,Long> {
}
4.2写第二层service层,这样是为了避免,controller层直接操作dao层,不安全。
(1) service层的一个结构:
(2)对应的代码:
《1》ArticleService
package com.example.jpamultipledatasource.service;
import com.example.jpamultipledatasource.modle.ArticleVO;
import java.util.List;
/**
* @author ZEShart
* @create 2021-03-26-23:38
*/
public interface ArticleService {
void saveArticle(ArticleVO articleVO);
void deleteArticle(Long id);
void updateArticle(ArticleVO articleVO);
ArticleVO getArticle(Long id);
List<ArticleVO> getAll();
}
《2》ArticleJPAServiceImpl
package com.example.jpamultipledatasource.service;
import com.example.jpamultipledatasource.Dao.testdb.Article;
import com.example.jpamultipledatasource.Dao.testdb.ArticleRepository;
import com.example.jpamultipledatasource.Dao.testdb2.Message;
import com.example.jpamultipledatasource.Dao.testdb2.MessageRepository;
import com.example.jpamultipledatasource.modle.ArticleVO;
import com.example.jpamultipledatasource.utils.DozerUtils;
import org.dozer.Mapper;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
import java.util.Optional;
/**
* @author ZEShart
* @create 2021-03-28-10:48
*/
public class ArticleJPAServiceImpl implements ArticleService{
//将JPA仓库对象注入
@Resource
private ArticleRepository articleRepository;
//将JPA仓库对象注入
@Resource
private MessageRepository messageRepository;
@Resource
private Mapper dozerMapper;
@Transactional
public void saveArticle(ArticleVO articleVO) {
//将articleVO转换为articlePO,使用dozer或者springUtils
Article articlePO=dozerMapper.map(articleVO,Article.class);
articleRepository.save(articlePO);
messageRepository.save(new Message(1,"zimug","爱学习"));
}
@Override
public void deleteArticle(Long id) {
articleRepository.deleteById(id);
}
@Override
public void updateArticle(ArticleVO articleVO) {
//将articleVO转换为articlePO,使用dozer或者springUtils
Article articlePO=dozerMapper.map(articleVO,Article.class);
articleRepository.save(articlePO);//新增和保存使用的是同一个函数方法save()
//articleJDBCDAO.updateById(article,secondaryJdbcTemplate);
}
@Override
public ArticleVO getArticle(Long id) {
Optional<Article> article=articleRepository.findById(id);
return dozerMapper.map(article.get(),ArticleVO.class);
}
@Override
public List<ArticleVO> getAll() {
List<Article> articleLis=articleRepository.findAll();
return DozerUtils.mapList(articleLis,ArticleVO.class);
}
}
5.这里就不写controller层了,写的是配置类config层,这一层一般都是固定搭配,唯一要改的就是你的包路径。
(1)对应的结构
(2)对应的代码:
《1》JPAPrimaryConfig
package com.example.jpamultipledatasource.config;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaVendorAdapter;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.HashMap;
/**
* @author ZEShart
* @create 2021-03-27-20:23
*/
@Configuration
@DependsOn("transactionManager")
@EnableJpaRepositories(basePackages = "com.example.jpamultipledatasource.Dao.testdb", //注意这里
entityManagerFactoryRef = "primaryEntityManager",
transactionManagerRef = "transactionManager")
public class JPAPrimaryConfig {
@Autowired
private JpaVendorAdapter jpaVendorAdapter;
//primary
@Primary
@Bean(name = "primaryDataSourceProperties")
@ConfigurationProperties(prefix = "spring.datasource.primary") //注意这里
public DataSourceProperties primaryDataSourceProperties() {
return new DataSourceProperties();
}
@Primary
@Bean(name = "primaryDataSource", initMethod = "init", destroyMethod = "close")
@ConfigurationProperties(prefix = "spring.datasource.primary")
public DataSource primaryDataSource() throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(primaryDataSourceProperties().getUrl());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword(primaryDataSourceProperties().getPassword());
mysqlXaDataSource.setUser(primaryDataSourceProperties().getUsername());
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("primary");
xaDataSource.setBorrowConnectionTimeout(60);
xaDataSource.setMaxPoolSize(20);
return xaDataSource;
}
@Primary
@Bean(name = "primaryEntityManager")
@DependsOn("transactionManager")
public LocalContainerEntityManagerFactoryBean primaryEntityManager() throws Throwable {
HashMap<String, Object> properties = new HashMap<String, Object>();
properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
properties.put("javax.persistence.transactionType", "JTA");
LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
entityManager.setJtaDataSource(primaryDataSource());
entityManager.setJpaVendorAdapter(jpaVendorAdapter);
//这里要修改成主数据源的扫描包
entityManager.setPackagesToScan("com.example.jpamultipledatasource.Dao.testdb");
entityManager.setPersistenceUnitName("primaryPersistenceUnit");
entityManager.setJpaPropertyMap(properties);
return entityManager;
}
}
《2》JPASecondaryConfig
package com.example.jpamultipledatasource.config;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaVendorAdapter;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.HashMap;
/**
* @author ZEShart
* @create 2021-03-27-20:34
*/
@Configuration
@DependsOn("transactionManager")
@EnableJpaRepositories(basePackages = "com.example.jpamultipledatasource.Dao.testdb2", //注意这里
entityManagerFactoryRef = "secondaryEntityManager",
transactionManagerRef = "transactionManager")
public class JPASecondaryConfig {
@Autowired
private JpaVendorAdapter jpaVendorAdapter;
@Bean(name = "secondaryDataSourceProperties")
@ConfigurationProperties(prefix = "spring.datasource.secondary") //注意这里
public DataSourceProperties masterDataSourceProperties() {
return new DataSourceProperties();
}
@Bean(name = "secondaryDataSource", initMethod = "init", destroyMethod = "close")
@ConfigurationProperties(prefix = "spring.datasource.secondary")
public DataSource masterDataSource() throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(masterDataSourceProperties().getUrl());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword(masterDataSourceProperties().getPassword());
mysqlXaDataSource.setUser(masterDataSourceProperties().getUsername());
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("secondary");
xaDataSource.setBorrowConnectionTimeout(60);
xaDataSource.setMaxPoolSize(20);
return xaDataSource;
}
@Bean(name = "secondaryEntityManager")
@DependsOn("transactionManager")
public LocalContainerEntityManagerFactoryBean masterEntityManager() throws Throwable {
HashMap<String, Object> properties = new HashMap<String, Object>();
properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
properties.put("javax.persistence.transactionType", "JTA");
LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
entityManager.setJtaDataSource(masterDataSource());
entityManager.setJpaVendorAdapter(jpaVendorAdapter);
//这里要修改成主数据源的扫描包
entityManager.setPackagesToScan("com.example.jpamultipledatasource.Dao.testdb2");
entityManager.setPersistenceUnitName("secondaryPersistenceUnit");
entityManager.setJpaPropertyMap(properties);
return entityManager;
}
}
《3》JPAAtomikosTransactionConfig
package com.example.jpamultipledatasource.config;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.orm.jpa.JpaVendorAdapter;
import org.springframework.orm.jpa.vendor.Database;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
/**
* @author ZEShart
* @create 2021-03-28-22:02
*/
@Configuration
@ComponentScan
@EnableTransactionManagement
public class JPAAtomikosTransactionConfig {
@Bean
public PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
return new PropertySourcesPlaceholderConfigurer();
}
//设置JPA特性
@Bean
public JpaVendorAdapter jpaVendorAdapter() {
HibernateJpaVendorAdapter hibernateJpaVendorAdapter = new HibernateJpaVendorAdapter();
//显示sql
hibernateJpaVendorAdapter.setShowSql(true);
//自动生成/更新表
hibernateJpaVendorAdapter.setGenerateDdl(true);
//设置数据库类型
hibernateJpaVendorAdapter.setDatabase(Database.MYSQL);
return hibernateJpaVendorAdapter;
}
@Bean(name = "userTransaction")
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
public TransactionManager atomikosTransactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
AtomikosJtaPlatform.transactionManager = userTransactionManager;
return userTransactionManager;
}
@Bean(name = "transactionManager")
@DependsOn({"userTransaction", "atomikosTransactionManager"})
public PlatformTransactionManager transactionManager() throws Throwable {
UserTransaction userTransaction = userTransaction();
AtomikosJtaPlatform.transaction = userTransaction;
TransactionManager atomikosTransactionManager = atomikosTransactionManager();
return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
}
}
《4》AtomikosJtaPlatform
package com.example.jpamultipledatasource.config;
import org.hibernate.engine.transaction.jta.platform.internal.AbstractJtaPlatform;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
/**
* @author ZEShart
* @create 2021-03-28-11:00
*/
public class AtomikosJtaPlatform extends AbstractJtaPlatform{
private static final long serialVersionUID = 1L;
static TransactionManager transactionManager;
static UserTransaction transaction;
@Override
protected TransactionManager locateTransactionManager() {
return transactionManager;
}
@Override
protected UserTransaction locateUserTransaction() {
return transaction;
}
}
6.写的是一个工具包
(1)结构如下:
(2)DozerUtils
package com.example.jpamultipledatasource.utils;
import org.assertj.core.util.Lists;
import org.dozer.DozerBeanMapperBuilder;
import org.dozer.Mapper;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
/**
* @author ZEShart
* @create 2021-03-26-17:26
*/
public class DozerUtils {
static Mapper mapper= DozerBeanMapperBuilder.buildDefault();
public static<T> List<T> mapList(Collection sourceList,Class<T> destinationClass){
List destinationList=Lists.newArrayList();
for (Iterator i$ = sourceList.iterator(); i$.hasNext();){
Object sourceObject=i$.next();
Object destinationObject=mapper.map(sourceObject,destinationClass);
destinationList.add(destinationObject);
}
return destinationList;
}
}
7.测试可以发现,要失败时,两个对数据库的操作都失败,要成功是都成功,即jpa+jta+atomokis分布式事务测试成功。