Contents

使用 AWS Java 工具包管理 SQS 队列

1. 概述

在本教程中,我们将探索如何通过 Java SDK使用 Amazon 的SQS (简单队列服务) 。

2.先决条件

使用适用于 SQS 的 Amazon AWS 开发工具包 所需的 Maven 依赖项、AWS 账户设置和客户端连接与本文中的相同。

假设我们已经创建了一个 AWSCredentials实例,如上一篇文章中所述,我们可以继续创建我们的 SQS 客户端:

AmazonSQS sqs = AmazonSQSClientBuilder.standard()
  .withCredentials(new AWSStaticCredentialsProvider(credentials))
  .withRegion(Regions.US_EAST_1)
  .build();

3. 创建队列

一旦我们设置了 SQS 客户端,创建队列就相当简单了。

3.1. 创建标准队列

让我们看看如何创建标准队列。为此,我们需要创建一个 CreateQueueRequest 实例:

CreateQueueRequest createStandardQueueRequest = new CreateQueueRequest("blogdemo-queue");
String standardQueueUrl = sqs.createQueue(createStandardQueueRequest).getQueueUrl();

3.2. 创建 FIFO 队列

创建 FIFO 类似于创建标准队列。我们仍将使用 CreateQueueRequest的实例,就像我们之前所做的那样。只有这一次,我们必须传入队列属性,并将FifoQueue属性设置为true

Map<String, String> queueAttributes = new HashMap<>();
queueAttributes.put("FifoQueue", "true");
queueAttributes.put("ContentBasedDeduplication", "true");
CreateQueueRequest createFifoQueueRequest = new CreateQueueRequest(
  "blogdemo-queue.fifo").withAttributes(queueAttributes);
String fifoQueueUrl = sqs.createQueue(createFifoQueueRequest)
  .getQueueUrl();

4. 将消息发布到队列

设置好队列后,我们就可以开始发送消息了。

4.1. 将消息发布到标准队列

要将消息发送到标准队列,我们必须创建一个 SendMessageRequest 实例。

然后我们将消息属性映射附加到此请求:

Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put("AttributeOne", new MessageAttributeValue()
  .withStringValue("This is an attribute")
  .withDataType("String"));  
    
SendMessageRequest sendMessageStandardQueue = new SendMessageRequest()
  .withQueueUrl(standardQueueUrl)
  .withMessageBody("A simple message.")
  .withDelaySeconds(30)
  .withMessageAttributes(messageAttributes);
sqs.sendMessage(sendMessageStandardQueue);

*withDelaySeconds()*指定消息应该在多长时间后到达队列。

4.2. 将消息发布到 FIFO 队列

在这种情况下,唯一的区别是我们必须指定 消息所属的group

SendMessageRequest sendMessageFifoQueue = new SendMessageRequest()
  .withQueueUrl(fifoQueueUrl)
  .withMessageBody("Another simple message.")
  .withMessageGroupId("blogdemo-group-1")
  .withMessageAttributes(messageAttributes);

如您在上面的代码示例中所见,我们使用 withMessageGroupId() 指定组。

4.3. 将多条消息发布到队列

我们还可以**使用单个请求将多条消息发布到队列中。**我们将创建一个 SendMessageBatchRequestEntry列表,我们将使用 SendMessageBatchRequest的实例发送该列表:

List <SendMessageBatchRequestEntry> messageEntries = new ArrayList<>();
messageEntries.add(new SendMessageBatchRequestEntry()
  .withId("id-1")
  .withMessageBody("batch-1")
  .withMessageGroupId("blogdemo-group-1"));
messageEntries.add(new SendMessageBatchRequestEntry()
  .withId("id-2")
  .withMessageBody("batch-2")
  .withMessageGroupId("blogdemo-group-1"));
SendMessageBatchRequest sendMessageBatchRequest
 = new SendMessageBatchRequest(fifoQueueUrl, messageEntries);
sqs.sendMessageBatch(sendMessageBatchRequest);

5. 从队列中读取消息

我们可以通过在ReceiveMessageRequest实例上调用 receiveMessage() 方法从队列接收消息:

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(fifoQueueUrl)
  .withWaitTimeSeconds(10)
  .withMaxNumberOfMessages(10);
List<Message> sqsMessages = sqs.receiveMessage(receiveMessageRequest).getMessages();

使用withMaxNumberOfMessages(),我们指定从队列中获取多少消息——尽管应该注意最大值是10

withWaitTimeSeconds()方法启用长轮询 长轮询是一种限制我们发送到 SQS 的接收消息请求数量的方法。

简而言之,这意味着我们将等待指定的秒数来检索消息。如果在这段时间内队列中没有消息,则请求将返回空。如果在这段时间内有消息到达队列,它将被返回。

我们可以获取给定消息的属性和正文:

sqsMessages.get(0).getAttributes();
sqsMessages.get(0).getBody();

6. 从队列中删除消息

要删除消息,我们将使用 DeleteMessageRequest

sqs.deleteMessage(new DeleteMessageRequest()
  .withQueueUrl(fifoQueueUrl)
  .withReceiptHandle(sqsMessages.get(0).getReceiptHandle()));

7. 死信队列

死信队列 必须与其基本队列的 类型相同——如果基本队列是 FIFO,它必须是 FIFO,如果基本队列是标准的,它必须是标准的。对于此示例,我们将使用标准队列。 我们需要做的第一件事是创建将成为我们的死信队列:

String deadLetterQueueUrl = sqs.createQueue("blogdemo-dead-letter-queue").getQueueUrl();

接下来,我们将获取新创建的队列的ARN(Amazon 资源名称)

GetQueueAttributesResult deadLetterQueueAttributes = sqs.getQueueAttributes(
  new GetQueueAttributesRequest(deadLetterQueueUrl)
    .withAttributeNames("QueueArn"));
String deadLetterQueueARN = deadLetterQueueAttributes.getAttributes()
  .get("QueueArn");

最后,我们将这个新创建的队列设置为我们原来的标准队列的死信队列:

SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest()
  .withQueueUrl(standardQueueUrl)
  .addAttributesEntry("RedrivePolicy",
    "{\"maxReceiveCount\":\"2\", "
      + "\"deadLetterTargetArn\":\"" + deadLetterQueueARN + "\"}");
sqs.setQueueAttributes(queueAttributesRequest);

我们在构建SetQueueAttributesRequest 实例时在addAttributesEntry() 方法 中设置的 JSON 数据包 包含我们需要的信息:  maxReceiveCount2,这意味着如果一条消息被接收了这么多次,则假定它没有被正确处理,并且被发送到我们的死信队列。

deadLetterTargetArn属性将 我们的标准队列指向我们新创建的死信队列。

8. 监控

我们可以**检查给定队列中当前有多少消息,以及有多少消息正在使用 SDK。**首先,我们需要创建一个 GetQueueAttributesRequest

从那里我们将检查队列的状态:

GetQueueAttributesRequest getQueueAttributesRequest 
  = new GetQueueAttributesRequest(standardQueueUrl)
    .withAttributeNames("All");
GetQueueAttributesResult getQueueAttributesResult 
  = sqs.getQueueAttributes(getQueueAttributesRequest);
System.out.println(String.format("The number of messages on the queue: %s", 
  getQueueAttributesResult.getAttributes()
    .get("ApproximateNumberOfMessages")));
System.out.println(String.format("The number of messages in flight: %s", 
  getQueueAttributesResult.getAttributes()
    .get("ApproximateNumberOfMessagesNotVisible")));

使用Amazon Cloud Watch 可以实现更深入的监控。