Contents

Atomikos 简介

1. 简介

AtomikosJava 应用程序的事务库。在本教程中,我们将了解为什么以及如何使用 Atomikos。

在此过程中,我们还将了解交易的基础知识以及我们需要它们的原因。

然后,我们将使用来自 Atomikos 的不同 API 创建一个简单的事务处理应用程序。

2. 了解基础

在我们讨论 Atomikos 之前,让我们了解一下交易到底是什么以及与它们相关的一些概念。简而言之,事务是一个逻辑工作单元,其效果在事务之外完全可见或根本不可见

让我们举一个例子来更好地理解这一点。典型的零售应用程序保留库存,然后下订单:

/uploads/java_atomikos/1.png

在这里,我们希望这两个操作要么一起发生,要么根本不发生。我们可以通过将这些操作包装到单个事务中来实现这一点。

2.1. 本地与分布式事务

一个事务可以涉及多个独立的操作。这些操作可以在相同资源或不同资源上执行。我们在这里将事务中的参与组件(如数据库)称为资源。

单个资源中的事务称为本地事务,而跨多个资源产生的事务称为分布式事务:

/uploads/java_atomikos/3.png

在这里,库存和订单可以是同一个数据库中的两个表,也可以是两个不同的数据库——可能完全运行在不同的机器上。

2.2. XA 规范和 Java 事务 API

XA 指的是eXtended Architecture ,它是分布式事务处理的规范。XA的目标是在涉及异构组件的全局事务中提供原子性

XA 规范通过称为两阶段提交的协议提供完整性。两阶段提交是一种广泛使用的分布式算法,用于促进决定提交或回滚分布式事务。

Java Transaction API (JTA) 是在 Java Community Process 下开发的 Java Enterprise Edition API。它使 Java 应用程序和应用程序服务器能够跨 XA 资源执行分布式事务

JTA 以 XA 架构为模型,利用两阶段提交。JTA 指定事务管理器和分布式事务中的其他方之间的标准 Java 接口。

3. Atomikos 简介

现在我们已经了解了交易的基础知识,我们已经准备好学习 Atomikos。在本节中,我们将了解 Atomikos 到底是什么以及它与 XA 和 JTA 等概念的关系。我们还将了解 Atomikos 的架构并了解其产品。

3.1. 什么是 Atomikos

正如我们所见,JTA 在 Java 中提供了用于构建具有分布式事务的应用程序的接口。现在,JTA 只是一个规范,不提供任何实现。为了让我们运行一个利用 JTA 的应用程序,我们需要一个 JTA 的实现。这样的实现称为事务管理器。

通常,应用程序服务器提供事务管理器的默认实现。例如,在 Enterprise Java Beans (EJB) 的情况下,EJB 容器管理事务行为而无需应用程序开发人员的任何显式干预。但是,在许多情况下,这可能并不理想,我们可能需要独立于应用程序服务器直接控制事务。

**Atomikos 是用于 Java 的轻量级事务管理器,**它使使用分布式事务的应用程序能够自包含。本质上,我们的应用程序不需要依赖像应用程序服务器这样的重量级组件来进行事务处理。这使分布式事务的概念更接近云原生架构。

3.2. Atomikos

Atomikos 主要构建为 JTA 事务管理器,因此,使用两阶段提交协议实现 XA 架构。让我们看一下 Atomikos 的高级架构:

/uploads/java_atomikos/5.png 在这里,Atomikos 正在促进跨数据库和消息队列的基于两阶段提交的事务。

3.3. Atomikos 产品供应

Atomikos 是一个分布式事务管理器,它提供了比 JTA/XA 要求的更多的功能。它有一个开源产品和更全面的商业产品:

  • TransactionsEssentials:Atomikos 的开源产品,为使用数据库和消息队列的 Java 应用程序提供 JTA/XA 事务管理器。这主要用于测试和评估目的。
  • ExtremeTransactions:**Atomikos 的商业产品,它提供跨复合应用程序的分布式事务,**包括除数据库和消息队列之外的 REST 服务。这对于构建执行极限事务处理 (XTP) 的应用程序很有用。

在本教程中,我们将使用 TransactionsEssentials 库来构建和演示 Atomikos 的功能。

4. 设置 Atomikos

正如我们之前看到的,Atomikos 的一大亮点是它是一个嵌入式事务服务。这意味着我们可以在与我们的应用程序相同的 JVM 中运行它。因此,设置 Atomikos 非常简单。

4.1. 依赖项

首先,我们需要设置依赖项。在这里,我们所要做的就是在我们的 Maven pom.xml文件中声明依赖项:

<dependency>
    <groupId>com.atomikos</groupId>
    <artifactId>transactions-jdbc</artifactId>
    <version>5.0.6</version>
</dependency>
<dependency>
    <groupId>com.atomikos</groupId>
    <artifactId>transactions-jms</artifactId>
    <version>5.0.6</version>
</dependency>

在这种情况下,我们对JDBCJMS 使用 Atomikos 依赖项,但在 Maven Central 上可以使用类似的依赖项来获取其他 XA 投诉资源。

4.2. 配置

Atomikos 提供了几个配置参数,每个参数都有合理的默认值。覆盖这些参数的最简单方法是在类路径中提供一个transactions.properties文件。我们可以为事务服务的初始化和运行添加几个参数。让我们看一个简单的配置来覆盖创建日志文件的目录:

com.atomikos.icatch.file=path_to_your_file

同样,我们可以使用其他参数来控制事务的超时、为我们的应用程序设置唯一名称或定义关闭行为。

4.3. 数据库

在我们的教程中,我们将构建一个简单的零售应用程序,就像我们之前描述的那样,它保留库存然后下订单。为简单起见,我们将使用关系数据库。此外,我们将使用多个数据库来演示分布式事务。但是,这可以很好地扩展到其他 XA 投诉资源,例如消息队列和主题

我们的库存数据库将有一个简单的表格来托管产品库存:

CREATE TABLE INVENTORY (
    productId VARCHAR PRIMARY KEY,
    balance INT
);

而且,我们的订单数据库将有一个简单的表格来托管已下订单:

CREATE TABLE ORDERS (
    orderId VARCHAR PRIMARY KEY,
    productId VARCHAR,
    amount INT NOT NULL CHECK (amount <= 5)
);

这是一个非常基本的数据库模式,仅用于演示。但是,重要的是要注意,我们的模式约束不允许产品数量超过五个的订单。

5. 与 Atomikos 合作

现在,我们已准备好使用 Atomikos 库之一来构建我们的分布式事务应用程序。在接下来的小节中,我们将使用内置的 Atomikos 资源适配器来连接我们的后端数据库系统。这是开始使用 Atomikos 的最快和最简单的方法

5.1. 实例化UserTransaction

我们将利用 JTA UserTransaction来划分事务边界。与交易服务相关的所有其他步骤将被自动处理。这包括使用事务服务征募和除名资源。 首先,我们需要从 Atomikos实例化一个UserTransaction

UserTransactionImp utx = new UserTransactionImp();

5.2. 实例化DataSource

然后,我们需要从 Atomikos实例化一个DataSourceAtomikos提供了两个版本的DataSource。*

第一个AtomikosDataSourceBean知道底层XADataSource

AtomikosDataSourceBean dataSource = new AtomikosDataSourceBean();

虽然AtomikosNonXADataSourceBean 使用任何常规 JDBC 驱动程序类:

AtomikosNonXADataSourceBean dataSource = new AtomikosNonXADataSourceBean();

顾名思义,AtomikosNonXADataSource不符合 XA。因此,使用这种数据源执行的事务不能保证是原子的。那么我们为什么要使用它呢?我们可能有一些不支持 XA 规范的数据库。Atomikos 不禁止我们使用这样的数据源,并且如果事务中有单个这样的数据源,仍然会尝试提供原子性。这种技术类似于 Last Resource Gambit,它是两阶段提交过程的一种变体。

此外,我们需要根据数据库和驱动程序适当地配置DataSource

5.3. 执行数据库操作

配置完成后,在我们的应用程序的事务上下文中使用DataSource相当容易:

public void placeOrder(String productId, int amount) throws Exception {
    String orderId = UUID.randomUUID().toString();
    boolean rollback = false;
    try {
        utx.begin();
        Connection inventoryConnection = inventoryDataSource.getConnection();
        Connection orderConnection = orderDataSource.getConnection();
        
        Statement s1 = inventoryConnection.createStatement();
        String q1 = "update Inventory set balance = balance - " + amount + " where productId ='" +
          productId + "'";
        s1.executeUpdate(q1);
        s1.close();
        
        Statement s2 = orderConnection.createStatement();
        String q2 = "insert into Orders values ( '" + orderId + "', '" + productId + "', " + amount + " )";
        s2.executeUpdate(q2);
        s2.close();
        
        inventoryConnection.close();
        orderConnection.close();
    } catch (Exception e) {
        rollback = true;
    } finally {
        if (!rollback)
            utx.commit();
        else
            utx.rollback();
    }
}

在这里,我们正在为事务边界内的库存和订单更新数据库表。这会自动提供这些操作以原子方式发生的好处。

5.4. 测试交易行为

最后,我们必须能够使用简单的单元测试来测试我们的应用程序,以验证事务行为是否符合预期:

@Test
public void testPlaceOrderSuccess() throws Exception {
    int amount = 1;
    long initialBalance = getBalance(inventoryDataSource, productId);
    Application application = new Application(inventoryDataSource, orderDataSource);
    
    application.placeOrder(productId, amount);
    
    long finalBalance = getBalance(inventoryDataSource, productId);
    assertEquals(initialBalance - amount, finalBalance);
}
  
@Test
public void testPlaceOrderFailure() throws Exception {
    int amount = 10;
    long initialBalance = getBalance(inventoryDataSource, productId);
    Application application = new Application(inventoryDataSource, orderDataSource);
    
    application.placeOrder(productId, amount);
    
    long finalBalance = getBalance(inventoryDataSource, productId);
    assertEquals(initialBalance, finalBalance);
}

在这里,我们期望一个有效的订单减少库存,而我们期望一个无效的订单保持库存不变。请注意,根据我们的数据库限制,任何数量超过五件产品的订单都被视为无效订单。

5.5. 高级 Atomikos 用法

上面的示例是使用 Atomikos 的最简单方法,并且可能足以满足大多数要求。但是,我们可以通过其他方式使用 Atomikos 来构建我们的应用程序。虽然其中一些选项使 Atomikos 易于使用,但其他选项提供了更大的灵活性。选择取决于我们的要求。

当然,不必总是为 JDBC/JMS 使用 Atomikos 适配器在直接使用XAResource时,我们可以选择使用 Atomikos 事务管理器。但是,在这种情况下,我们必须明确地使用事务服务来登记和取消XAResource实例。

Atomikos 还可以通过专有接口UserTransactionService使用更高级的功能。使用此接口,我们可以显式注册资源以进行恢复。这使我们能够细粒度地控制应该恢复哪些资源、应该如何恢复它们以及应该何时进行恢复。

6. 集成 Atomikos

虽然 Atomikos 为分布式事务提供了出色的支持,但使用这种低级 API 并不总是很方便。为了专注于业务领域并避免样板代码的混乱,我们经常需要不同框架和库的支持。Atomikos 支持大多数与后端集成相关的流行 Java 框架。我们将在这里探讨其中的几个。

6.1. 使用 Spring 和DataSource的 Atomikos

Spring 是 Java 中流行的框架之一,它提供了一个控制反转 (IoC) 容器。值得注意的是,它对交易也有极好的支持。它使用面向方面的编程 (AOP) 技术提供声明式事务管理。

Spring 支持多种事务 API,包括用于分布式事务的 JTA。我们可以毫不费力地将Atomikos 用作 Spring 中的 JTA 事务管理器。最重要的是,感谢 Spring,我们的应用程序对 Atomikos 几乎是不可知的。

让我们看看如何解决我们之前的问题,这次利用 Spring。我们将从重写Application类开始:

public class Application {
    private DataSource inventoryDataSource;
    private DataSource orderDataSource;
    
    public Application(DataSource inventoryDataSource, DataSource orderDataSource) {
        this.inventoryDataSource = inventoryDataSource;
        this.orderDataSource = orderDataSource;
    }
    
    @Transactional(rollbackFor = Exception.class)
    public void placeOrder(String productId, int amount) throws Exception {
        String orderId = UUID.randomUUID().toString();
        Connection inventoryConnection = inventoryDataSource.getConnection();
        Connection orderConnection = orderDataSource.getConnection();
        
        Statement s1 = inventoryConnection.createStatement();
        String q1 = "update Inventory set balance = balance - " + amount + " where productId ='" + 
          productId + "'";
        s1.executeUpdate(q1);
        s1.close();
        
        Statement s2 = orderConnection.createStatement();
        String q2 = "insert into Orders values ( '" + orderId + "', '" + productId + "', " + amount + " )";
        s2.executeUpdate(q2);
        s2.close();
        
        inventoryConnection.close();
        orderConnection.close();
    }
}

正如我们在此处看到的,大多数与事务相关的样板代码已被方法级别的单个注释所取代。此外,Spring 负责实例化和注入我们的应用程序所依赖的DataSource

当然,我们还要给Spring提供相关的配置。我们可以使用一个简单的 Java 类来配置这些元素:

@Configuration
@EnableTransactionManagement
public class Config {
    @Bean(initMethod = "init", destroyMethod = "close")
    public AtomikosDataSourceBean inventoryDataSource() {
        AtomikosDataSourceBean dataSource = new AtomikosDataSourceBean();
        // Configure database holding order data
        return dataSource;
    }
    
    @Bean(initMethod = "init", destroyMethod = "close")
    public AtomikosDataSourceBean orderDataSource() {
        AtomikosDataSourceBean dataSource = new AtomikosDataSourceBean();
        // Configure database holding order data
        return dataSource;
    }
    
    @Bean(initMethod = "init", destroyMethod = "close")
    public UserTransactionManager userTransactionManager() throws SystemException {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setTransactionTimeout(300);
        userTransactionManager.setForceShutdown(true);
        return userTransactionManager;
    }
    
    @Bean
    public JtaTransactionManager jtaTransactionManager() throws SystemException {
        JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
        jtaTransactionManager.setTransactionManager(userTransactionManager());
        jtaTransactionManager.setUserTransaction(userTransactionManager());
        return jtaTransactionManager;
    }
    
    @Bean
    public Application application() {
        return new Application(inventoryDataSource(), orderDataSource());
    }
}

在这里,我们为保存库存和订单数据的两个不同数据库配置AtomikosDataSourceBean。此外,我们还为 JTA 事务管理器提供了必要的配置。

现在,我们可以像以前一样测试我们的应用程序的事务行为。同样,我们应该验证一个有效的订单减少了我们的库存余额,而一个无效的订单保持不变。

6.2. 使用 Spring、JPA 和 Hibernate 的 Atomikos

虽然 Spring 在一定程度上帮助我们减少了样板代码,但它仍然非常冗长。一些工具可以使在 Java 中使用关系数据库变得更加容易。Java Persistence API (JPA) 是描述 Java 应用程序中关系数据管理的规范。这在很大程度上简化了数据访问和操作代码。

Hibernate 是 JPA 规范最流行的实现之一。Atomikos 对包括 Hibernate 在内**的多种 JPA 实现有很好的支持。**和以前一样,由于 Spring 和 JPA,我们的应用程序对 Atomikos 和 Hibernate 仍然是不可知的!

让我们看看Spring、JPA 和 Hibernate 如何使我们的应用程序更加简洁,同时通过 Atomikos 提供分布式事务的好处。和以前一样,我们将从重写Application类开始:

public class Application {
    @Autowired
    private InventoryRepository inventoryRepository;
    @Autowired
    private OrderRepository orderRepository;
    
    @Transactional(rollbackFor = Exception.class)
    public void placeOrder(String productId, int amount) throws SQLException { 
        String orderId = UUID.randomUUID().toString();
        Inventory inventory = inventoryRepository.findOne(productId);
        inventory.setBalance(inventory.getBalance() - amount);
        inventoryRepository.save(inventory);
        Order order = new Order();
        order.setOrderId(orderId);
        order.setProductId(productId);
        order.setAmount(new Long(amount));
        orderRepository.save(order);
    }
}

正如我们所见,我们现在不处理任何低级数据库 API。然而,要让这个魔法发挥作用,我们确实需要配置 Spring Data JPA 类和配置。我们将从定义域实体开始:

@Entity
@Table(name = "INVENTORY")
public class Inventory {
    @Id
    private String productId;
    private Long balance;
    // Getters and Setters
}
@Entity
@Table(name = "ORDERS")
public class Order {
    @Id
    private String orderId;
    private String productId;
    @Max(5)
    private Long amount;
    // Getters and Setters
}

接下来,我们需要为这些实体提供存储库:

@Repository
public interface InventoryRepository extends JpaRepository<Inventory, String> {
}
  
@Repository
public interface OrderRepository extends JpaRepository<Order, String> {
}

这些是非常简单的接口,Spring Data 负责使用实际代码对这些接口进行详细说明,以处理数据库实体。 最后,我们需要为库存和订单数据库以及事务管理器的数据源提供相关配置:

@Configuration
@EnableJpaRepositories(basePackages = "com.blogdemo.atomikos.spring.jpa.inventory",
  entityManagerFactoryRef = "inventoryEntityManager", transactionManagerRef = "transactionManager")
public class InventoryConfig {
    @Bean(initMethod = "init", destroyMethod = "close")
    public AtomikosDataSourceBean inventoryDataSource() {
        AtomikosDataSourceBean dataSource = new AtomikosDataSourceBean();
        // Configure the data source
        return dataSource;
    }
    
    @Bean
    public EntityManagerFactory inventoryEntityManager() {
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
        LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
        factory.setJpaVendorAdapter(vendorAdapter);
        // Configure the entity manager factory
        return factory.getObject();
    }
}
@Configuration
@EnableJpaRepositories(basePackages = "com.blogdemo.atomikos.spring.jpa.order", 
  entityManagerFactoryRef = "orderEntityManager", transactionManagerRef = "transactionManager")
public class OrderConfig {
    @Bean(initMethod = "init", destroyMethod = "close")
    public AtomikosDataSourceBean orderDataSource() {
        AtomikosDataSourceBean dataSource = new AtomikosDataSourceBean();
        // Configure the data source
        return dataSource;
    }
    
    @Bean
    public EntityManagerFactory orderEntityManager() {
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
        LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
        factory.setJpaVendorAdapter(vendorAdapter);
        // Configure the entity manager factory
        return factory.getObject();
    }
}
@Configuration
@EnableTransactionManagement
public class Config {
    @Bean(initMethod = "init", destroyMethod = "close")
    public UserTransactionManager userTransactionManager() throws SystemException {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setTransactionTimeout(300);
        userTransactionManager.setForceShutdown(true);
        return userTransactionManager;
    }
    
    @Bean
    public JtaTransactionManager transactionManager() throws SystemException {
        JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
        jtaTransactionManager.setTransactionManager(userTransactionManager());
        jtaTransactionManager.setUserTransaction(userTransactionManager());
        return jtaTransactionManager;
    }
    
    @Bean
    public Application application() {
        return new Application();
    }
}

这仍然是我们必须做的很多配置。这部分是因为我们正在为两个独立的数据库配置 Spring JPA。此外,我们可以通过Spring Boot 进一步减少这些配置,但这超出了本教程的范围。

和以前一样,我们可以测试我们的应用程序的相同事务行为。这次没有什么新鲜事,除了我们现在使用 Spring Data JPA 和 Hibernate。

7. 超越 JTA 的原子

虽然 JTA 为分布式系统提供了出色的事务支持,但这些系统必须像大多数关系数据库或消息队列一样符合 XA 标准。但是,如果其中一个系统不支持两阶段提交协议的 XA 规范,那么 JTA 就没有用了。有几个资源属于这一类,尤其是在微服务架构中。

几种替代协议支持分布式事务。其中之一是利用补偿的两阶段提交协议的变体。此类事务具有宽松的隔离保证,称为基于补偿的事务。参与者在第一阶段本身提交事务的各个部分,为第二阶段可能的回滚提供补偿处理程序。

有几种设计模式和算法可以实现基于补偿的事务。例如,Sagas 就是这样一种流行的设计模式。但是,它们通常实施起来很复杂并且容易出错。

Atomikos 提供了一种基于补偿的交易,称为 Try-Confirm/Cancel (TCC)。TCC 为事务下的实体提供更好的业务语义。但是,这只有在参与者提供高级架构支持的情况下才有可能,而且 TCC 仅在 Atomikos 商业产品 ExtremeTransactions 下可用。

8. Atomikos 的替代品

我们已经经历了足够多的 Atomikos 来欣赏它所提供的东西。此外,Atomikos 还提供具有更强大功能的商业产品。但是,在选择 JTA 事务管理器时,Atomikos 并不是唯一的选择。还有其他一些可靠的选项可供选择。让我们看看他们对 Atomikos 的表现如何。

8.1. Narayana

Narayana 可能是最古老的开源分布式事务管理器之一,目前由 Red Hat 管理。它已在整个行业中广泛使用,并通过社区支持发展并影响了多个规范和标准。

Narayana 支持广泛的事务协议,例如 JTA、JTS、Web 服务和 REST,仅举几例。此外,Narayana 可以嵌入到各种容器中。

与 Atomikos 相比,Narayana 提供了分布式事务管理器的几乎所有功能。在许多情况下,Narayana 在应用程序中的集成和使用更加灵活。例如,Narayana 具有针对 C/C++ 和 Java 的语言绑定。然而,这是以增加复杂性为代价的,而且 Atomikos 相对更容易配置和使用。

8.2. Bitronix

Bitronix一个功能齐全的 XA 事务管理器,可提供 JTA API 所需的所有服务。重要的是,Bitronix 是一个可嵌入的事务库,它提供了广泛而有用的错误报告和日志记录。对于分布式事务,这使得调查故障变得更加容易。此外,它对 Spring 的事务功能具有出色的支持,并且可以以最少的配置工作。

与 Atomikos 相比,Bitronix 是一个开源项目,没有提供产品支持的商业产品。作为 Atomikos 商业产品的一部分,但 Bitronix 缺乏的关键特性包括对微服务的支持和声明式弹性扩展能力。