Skip to main content

Queue

A general purpose queue that allows you to enqueue and dequeue items singularly or in bulk to it.

Imports and servicers

To use a Queue, import the library where you would like to use it.

from reboot.std.item.v1.item import Item
from reboot.std.collections.queue.v1.queue import Queue

Also make sure to include the Queue servicers when starting up your Application. (Note: this import is different from above.)

from reboot.std.collections.queue.v1 import queue

async def main():
application = Application(
servicers=[MyServicer] + queue.servicers(),
)

await application.run()

Queues

Each Queue you create will need a unique ID. Using a reference to that Queue, you will be able to Enqueue and Dequeue Items to it one at a time or in bulk.

Referencing Queues

This creates two Queues with IDs "my-first-queue" and "my-second-queue".

first_queue = Queue.ref("my-first-queue")
second_queue = Queue.ref("my-second-queue")

Items

An Item can be a Value, string of bytes, or Any and they can be enqueued and dequeued from a specified Queue. See the examples below for how to format Items to be enqueued and dequeued.

In practice, you will want to use just one of these three options in a given Queue.

To provide the proper formatting to Value or Any, this library also requires a dependency on protobuf in Python, or on @bufbuild/protobuf in TypeScript.

Any refers to the Any protobuf, and is most useful if you are using protobufs to structure your data.

Methods

Enqueue

Enqueues a single Item as a Value, bytes, or Any.

from reboot.protobuf import from_dict

await first_queue.enqueue(
context,
value=from_dict({"details": "details-go-here"}),
)

await second_queue.enqueue(context, bytes=b"my-bytes")

await third_queue.enqueue(context, any=<Any>)

Dequeue

Dequeues a single Item as the same format you enqueued it.

from reboot.protobuf import as_dict

item = await first_queue.dequeue(context)
print(as_dict(item.value)["details"])

item = await second_queue.dequeue(context)
print(item.bytes)

item = await third_queue.dequeue(context)
print(item.any)

Bulk Enqueue

Enqueues a list of Items.

This examples demonstrates how to store different types of data in Value, but in practice, you will want to use a uniform structure for all Values stored in one Queue.

from reboot.protobuf import from_bool, from_dict, from_int, from_list, from_string

await first_queue.enqueue(
context,
items=[
Item(value=from_int(3)),
Item(value=from_string("apple")),
Item(value=from_bool(True)),
Item(value=from_dict({"details": "details-go-here"})),
Item(value=from_list(["a", "b", "c"])),
],
)

await second_queue.enqueue(
context,
items=[
Item(bytes=b"some-bytes"),
Item(bytes=b"some-more-bytes"),
],
)

await third_queue.enqueue(
context,
items=[
Item(any=<Any>),
Item(any=<Any>),
],
)

Bulk Dequeue

By specifying bulk and at_most/atMost, you can also dequeue Items in bulk.

items = await first_queue.dequeue(context, bulk=True, at_most=5)
# items.items is a list of Items