Contents

Java Big Queue简介

1. 概述

在本教程中,我们将快速了解Big Queue ,它是持久队列 的 Java 实现。

我们将稍微讨论一下它的架构,然后我们将通过快速实用的示例学习如何使用它。

2.用法

我们需要将bigqueue依赖项添加到我们的项目中:

<dependency>
    <groupId>com.leansoft</groupId>
    <artifactId>bigqueue</artifactId>
    <version>0.7.0</version>
</dependency>

我们还需要添加它的存储库:

<repository>
    <id>github.release.repo</id>
    <url>https://raw.github.com/bulldog2011/bulldog-repo/master/repo/releases/</url>
</repository>

如果我们习惯于使用基本队列,那么适应 Big Queue 将是一件轻而易举的事,因为它的 API 非常相似。

2.1. 初始化

我们可以通过简单地调用其构造函数来初始化我们的队列:

@Before
public void setup() {
    String queueDir = System.getProperty("user.home");
    String queueName = "blogdemo-queue";
    bigQueue = new BigQueueImpl(queueDir, queueName);
}

第一个参数是队列的主目录。 第二个参数代表我们队列的名称。它将在队列的主目录中创建一个文件夹,我们可以在其中保存数据。 我们应该记得在完成后关闭我们的队列以防止内存泄漏:

bigQueue.close();

2.2. 插入

我们可以通过简单地调用enqueue方法将元素添加到尾部:

@Test
public void whenAddingRecords_ThenTheSizeIsCorrect() {
    for (int i = 1; i <= 100; i++) {
        bigQueue.enqueue(String.valueOf(i).getBytes());
    }
 
    assertEquals(100, bigQueue.size());
}

我们应该注意,Big Queue 只支持*byte[]*数据类型,因此我们负责在插入时序列化我们的记录。

2.3. read

正如我们所料,使用dequeue方法读取数据同样容易:

@Test
public void whenAddingRecords_ThenTheyCanBeRetrieved() {
    bigQueue.enqueue(String.valueOf("new_record").getBytes());
    String record = new String(bigQueue.dequeue());
 
    assertEquals("new_record", record);
}

我们还必须小心,在读取时正确地反序列化我们的数据。

从空队列中读取会抛出NullPointerException

我们应该使用isEmpty方法验证队列中是否有值:

if(!bigQueue.isEmpty()){
    // read
}

要清空我们的队列而不必遍历每条记录,我们可以使用removeAll方法:

bigQueue.removeAll();

2.4. 偷看

偷看时,我们只是读取一条记录而不使用它:

@Test
public void whenPeekingRecords_ThenSizeDoesntChange() {
    for (int i = 1; i <= 100; i++) {
        bigQueue.enqueue(String.valueOf(i).getBytes());
    }
 
    String firstRecord = new String(bigQueue.peek());
    assertEquals("1", firstRecord);
    assertEquals(100, bigQueue.size());
}

2.5. 删除已使用的记录

当我们调用 dequeue方法时,记录将从我们的队列中删除,但它们仍保留在磁盘上。

这可能会用不必要的数据填满我们的磁盘。

幸运的是,我们可以使用gc方法删除已消费的记录:

bigQueue.gc();

就像Java 中的垃圾收集器 从堆中清除未引用的对象一样,gc从我们的磁盘中清除已消耗的记录。

3. 架构和特点

Big Queue 的有趣之处在于它的代码库非常小——只有 12 个源文件占用大约 20KB 的磁盘空间。

在高层次上,它只是一个擅长处理大量数据的持久化队列。

3.1. 处理大量数据

**队列的大小仅受可用磁盘空间总量的限制。**我们队列中的每条记录都保存在磁盘上,以防崩溃。

我们的瓶颈将是磁盘 I/O,这意味着 SSD 将显着提高 HDD 的平均吞吐量。

3.2. 极速访问数据

如果我们看一下它的源代码,我们会注意到队列由内存映射文件支持。我们队列的可访问部分(头部)保存在 RAM 中,因此访问记录将非常快。

即使我们的队列变得非常大并占用数 TB 的磁盘空间,我们仍然能够以 O(1) 的时间复杂度读取数据。

如果我们需要读取大量消息并且速度是一个关键问题,我们应该考虑使用 SSD 而不是 HDD,因为将数据从磁盘移动到内存会快得多。

3.3. 优点

一个很大的优势是它能够长得很大。 我们可以通过添加更多存储将其扩展到理论上的无限大,因此它的名字是“大”。

在并发环境中,Big Queue 可以在商用机器上产生和消耗大约 166MBps 的数据。

如果我们的平均消息大小是 1KB,它每秒可以处理 166k 条消息。

在单线程环境中,它每秒可以处理多达 333k条消息——相当令人印象深刻!

3.4. 缺点

我们的消息会保留在磁盘中,即使在我们使用它们之后也是如此,因此我们必须在不再需要时处理垃圾收集数据。 我们还负责序列化和反序列化我们的消息。