Skip to main content

Queue

A general purpose queue that allows you to enqueue and dequeue Items singularly or in bulk to it. It is durable, which means that it will persist across restarts or system failures, and transactional, so you can access it safely.

Each Queue will need a unique ID and is created implicitly the first time you use it. Items can be Enqueued as a single Value, bytes, or Any object or can be Enqueueded in bulk as a list of Items. They can then be Dequeued singularly in bulk as well. In practice, you will want to use just one of these formats in a given Queue. Read more about Items to determine the best format for your data.

Imports and servicers

To use a Queue, import the library where you would like to use it.

from reboot.std.collections.queue.v1.queue import Queue
from reboot.std.item.v1.item import Item

Also make sure to include the Queue servicers when starting up your Application. (Note: this import is different from above.)

from reboot.std.collections.queue.v1 import queue

async def main():
application = Application(
servicers=[MyServicer] + queue.servicers(),
)

await application.run()

Referencing Queues

This creates references to two Queuereferences with IDs "my-first-queue" and "my-second-queue". You can then call methods on these references.

first_queue = Queue.ref("my-first-queue")
second_queue = Queue.ref("my-second-queue")

Methods

Enqueue

Enqueues a single Item directly from a Value, bytes, or Any.

from reboot.protobuf import from_dict

await first_queue.enqueue(
context,
value=from_dict({"details": "details-go-here"}),
)

await second_queue.enqueue(context, bytes=b"my-bytes")

await third_queue.enqueue(context, any=<Any>)

Dequeue

Dequeues a single Item.

from reboot.protobuf import as_dict

item = await first_queue.dequeue(context)
print(as_dict(item.value)["details"])

item = await second_queue.dequeue(context)
print(item.bytes)

item = await third_queue.dequeue(context)
print(item.any)

Bulk Enqueue

Enqueues a list of Items.

from reboot.protobuf import from_bool, from_dict, from_int, from_list, from_str

await first_queue.enqueue(
context,
items=[
Item(value=from_bool(True)),
Item(value=from_int(3)),
Item(value=from_str("apple")),
Item(value=from_list(["a", "b", "c"])),
Item(value=from_dict({"details": "details-go-here"})),
],
)

await second_queue.enqueue(
context,
items=[
Item(bytes=b"some-bytes"),
Item(bytes=b"some-more-bytes"),
],
)

await third_queue.enqueue(
context,
items=[
Item(any=<Any>),
Item(any=<Any>),
],
)

Bulk Dequeue

By specifying bulk and at_most/atMost, you can also dequeue Items in bulk.

items = await first_queue.dequeue(context, bulk=True, at_most=5)
# items.items is a list of Items