Skip to main content

PubSub

A pub/sub implementation that allows you to publish to Topics and subscribe Queues to those Topics.

Each Topic is associated with a unique ID and is created implicitly the first time it is published to or subscribed to.

Messages are marshalled as Items to be published to Topics. They can be published as a single Value, bytes, or Any object or can be published in bulk as a list of Items. In practice, you will want to use just one of these formats for your messages in a given Topic. Read more about Items to determine the best format for your data.

Subscription is Queue based. That is, when you subscribe to a Topic, you reference a Queue by its ID. Any messages published to that Topic will be received on the specified Queue and can be dequeued to be read.

Imports and servicers

To use the pub/sub library, import the library where you would like to use it. To read messages from a subscribed Queue, you will also need to import the Queue library.

from reboot.std.collections.queue.v1.queue import Queue
from reboot.std.item.v1.item import Item
from reboot.std.pubsub.v1.pubsub import Topic

Also make sure to include the pub/sub servicers when starting up your Application. (Note: this import is different from above.)

from reboot.std.pubsub.v1 import pubsub

async def main():
application = Application(
servicers=[MyServicer] + pubsub.servicers(),
)

await application.run()

Referencing Topics

This creates references to two Topics with IDs "my-first-topic" and "my-second-topic".

first_topic = Topic.ref("my-first-topic")
second_topic = Topic.ref("my-second-topic")

Methods

Publish

Publish a single message as a Value, bytes, or Any.

from reboot.protobuf import from_dict

await first_topic.publish(
context,
value=from_dict({"details": "details-go-here"}),
)

await second_topic.publish(context, bytes=b"my-bytes")

await third_topic.publish(context, any=<Any>)

Subscribe

Subscribe a single Queue as referenced by its ID to a Topic. Any subsequent messages publishes to the Topic will be received by dequeueing on the Queue.

Note: Any messages published to the Topic before the Queue is subscribed is not guaranteed to be received by the Queue.

await first_topic.subscribe(context, queue_id="receiving-queue")
response = await Queue.ref("receiving-queue").dequeue(context)

Bulk Publish

Publish a list of messages, formatted as Items.

This examples demonstrates how to store different types of data in Value, but in practice, you will want to use a uniform structure for all Values published to a Topic.

from reboot.protobuf import from_bool, from_dict, from_int, from_list, from_str
from reboot.std.collections.queue.v1.queue import Item

await first_topic.publish(
context,
items=[
Item(value=from_bool(True)),
Item(value=from_int(3)),
Item(value=from_str("apple")),
Item(value=from_list(["a", "b", "c"])),
Item(value=from_dict({"details": "details-go-here"})),
],
)

await second_topic.publish(
context,
items=[
Item(bytes=b"some-bytes"),
Item(bytes=b"some-more-bytes"),
],
)

await third_topic.publish(
context,
items=[
Item(any=<Any>),
Item(any=<Any>),
],
)