Contents

Apache RocketMQ 和 Spring Boot

1.简介

在本教程中,我们将使用 Spring Boot 和开源分布式消息传递和流数据平台 Apache RocketMQ 创建消息生产者和消费者。

2. 依赖

对于 Maven 项目,我们需要添加RocketMQ Spring Boot Starter 依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

3. 产生消息

对于我们的示例,我们将创建一个基本的消息生产者,每当用户在购物车中添加或删除商品时,它都会发送事件。

首先,让我们在application.properties中设置我们的服务器位置和组名:

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=cart-producer-group

请注意,如果我们有多个名称服务器,我们可以将它们列出为 host:port;host:port

现在,为了简单起见,我们将创建一个CommandLineRunner应用程序并在应用程序启动期间生成一些事件:

@SpringBootApplication
public class CartEventProducer implements CommandLineRunner {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    public static void main(String[] args) {
        SpringApplication.run(CartEventProducer.class, args);
    }
    public void run(String... args) throws Exception {
        rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1));
        rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2));
        rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1));
    }
}

CartItemEvent仅包含两个属性——商品的 id 和 数量:

class CartItemEvent {
    private String itemId;
    private int quantity;
    // constructor, getters and setters
}

在上面的例子中,我们使用了convertAndSend()方法,一个由AbstractMessageSendingTemplate抽象类定义的通用方法,来发送我们的购物车事件。它有两个参数:一个目的地,在我们的例子中是一个主题名称,以及一个消息负载。

4.消息消费者

使用 RocketMQ 消息就像创建一个带有*@RocketMQMessageListener注解的 Spring 组件并实现RocketMQListener*接口一样简单:

@SpringBootApplication
public class CartEventConsumer {
    public static void main(String[] args) {
        SpringApplication.run(CartEventConsumer.class, args);
    }
    @Service
    @RocketMQMessageListener(
      topic = "cart-item-add-topic",
      consumerGroup = "cart-consumer_cart-item-add-topic"
    )
    public class CardItemAddConsumer implements RocketMQListener<CartItemEvent> {
        public void onMessage(CartItemEvent addItemEvent) {
            log.info("Adding item: {}", addItemEvent);
            // additional logic
        }
    }
    @Service
    @RocketMQMessageListener(
      topic = "cart-item-removed-topic",
      consumerGroup = "cart-consumer_cart-item-removed-topic"
    )
    public class CardItemRemoveConsumer implements RocketMQListener<CartItemEvent> {
        public void onMessage(CartItemEvent removeItemEvent) {
            log.info("Removing item: {}", removeItemEvent);
            // additional logic
        }
    }
}

我们需要为我们正在监听的每个消息主题创建一个单独的组件。在每一个监听器中,我们通过@RocketMQMessageListener 注解定义主题名称和consumerGroup名称。

5. 同步和异步传输

在前面的示例中,我们使用了convertAndSend方法来发送我们的消息。不过,我们还有其他一些选择。

例如,我们可以调用与 convertAndSend不同的syncSend ,因为它返回SendResult对象。

例如,它可以用来验证我们的消息是否成功发送或获取它的 id:

public void run(String... args) throws Exception { 
    SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic", 
      new CartItemEvent("bike", 1)); 
    SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic", 
      new CartItemEvent("computer", 2)); 
    SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic", 
      new CartItemEvent("bike", 1)); 
}

与*convertAndSend 一样,*此方法仅在发送过程完成时返回。

在需要高可靠性的情况下,比如重要的通知消息或短信通知,我们应该使用同步传输。

另一方面,我们可能希望异步发送消息并在发送完成时收到通知。

我们可以使用 asyncSend来做到这一点,它接受一个 SendCallback作为参数并立即返回:

rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        log.error("Successfully sent cart item");
    }
    @Override
    public void onException(Throwable throwable) {
        log.error("Exception during cart item sending", throwable);
    }
});

我们在需要高吞吐量的情况下使用异步传输。

最后,对于吞吐量要求非常高的场景,我们可以使用sendOneWay代替asyncSend。 sendOneWay 与asyncSend的不同之处在于它不保证消息会被发送。

单向传输也可以用于普通的可靠性情况,例如收集日志。

6. 在事务中发送消息

RocketMQ 为我们提供了在事务中发送消息的能力。我们可以使用*sendInTransaction()*方法来做到这一点:

MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build();
rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);

另外,我们必须实现一个RocketMQLocalTransactionListener接口:

@RocketMQTransactionListener(txProducerGroup="test-transaction")
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
      @Override
      public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
          // ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN
          return RocketMQLocalTransactionState.UNKNOWN;
      }
      @Override
      public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
          // ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN
          return RocketMQLocalTransactionState.COMMIT;
      }
}

sendMessageInTransaction()中,第一个参数是事务名称。它必须与@RocketMQTransactionListener的成员字段txProducerGroup 相同。

7. 消息生产者配置

我们还可以配置消息生产者本身的各个方面:

  • Rocketmq.producer.send-message-timeout:消息发送超时时间,以毫秒为单位——默认值为 3000
  • RocketMQ.producer.compress-message-body-threshold:超过该阈值,RocketMQ 将压缩消息 - 默认值为 1024。
  • Rocketmq.producer.max-message-size:最大消息大小(以字节为单位)——默认值为 4096。
  • Rocketmq.producer.retry-times-when-send-async-failed:发送失败前在异步模式下内部执行的最大重试次数——默认值为 2。
  • Rocketmq.producer.retry-next-server:指示是否在内部发送失败时重试另一个代理 - 默认值为false
  • Rocketmq.producer.retry-times-when-send-failed:发送失败前在异步模式下内部执行的最大重试次数——默认值为 2。