PubSub
A pub/sub implementation that allows you to publish to Topics and subscribe
Queues to those Topics.
Each Topic is associated with a unique ID and is created implicitly the
first time it is published to or subscribed to.
Messages are marshalled as Items to be published
to Topics. They can be published as a single Value, bytes,
or Any object or can be published in bulk as a list of Items.
In practice, you will want to use just one of these formats for your messages
in a given Topic. Read more about Items
to determine the best format for your data.
Subscription is Queue based. That is, when you
subscribe to a Topic, you reference a Queue by its ID.
Any messages published to that Topic will be received on the specified Queue
and can be dequeued to be read.
Imports and servicers
To use the pub/sub library, import the library where you would like to use it.
To read messages from a subscribed Queue, you will also need to import the
Queue library.
- Python
- TypeScript
from reboot.std.collections.queue.v1.queue import Queue
from reboot.std.item.v1.item import Item
from reboot.std.pubsub.v1.pubsub import Topic
import { Queue } from "@reboot-dev/reboot-std/collections/queue/v1";
import { Topic } from "@reboot-dev/reboot-std/pubsub/v1";
Also make sure to include the pub/sub servicers when starting up your Application.
(Note: this import is different from above.)
- Python
- TypeScript
from reboot.std.pubsub.v1 import pubsub
async def main():
application = Application(
servicers=[MyServicer] + pubsub.servicers(),
)
await application.run()
import pubsub from "@reboot-dev/reboot-std/pubsub/v1";
new Application({
servicers: [MyServicer, ...pubsub.servicers()],
initialize,
}).run();
Referencing Topics
This creates references to two Topics with IDs "my-first-topic" and "my-second-topic".
- Python
- TypeScript
first_topic = Topic.ref("my-first-topic")
second_topic = Topic.ref("my-second-topic")
const firstTopic = Topic.ref("my-first-topic");
const secondTopic = Topic.ref("my-second-topic");
Methods
Publish
Publish a single message as a Value, bytes, or Any.
- Python
- TypeScript
from reboot.protobuf import from_dict
await first_topic.publish(
context,
value=from_dict({"details": "details-go-here"}),
)
await second_topic.publish(context, bytes=b"my-bytes")
await third_topic.publish(context, any=<Any>)
import { Value } from "@bufbuild/protobuf";
await firstTopic.publish(context, {
value: Value.fromJson({ details: "details-go-here" }),
});
await secondTopic.publish(context, {
bytes: new TextEncoder().encode("my-bytes"),
});
await thirdTopic.publish(context, {
any: <Any>,
});
Subscribe
Subscribe a single Queue as referenced by its ID to a Topic.
Any subsequent messages publishes to the Topic will be received by
dequeueing on the Queue.
Note: Any messages published to the Topic before the Queue is subscribed
is not guaranteed to be received by the Queue.
- Python
- TypeScript
await first_topic.subscribe(context, queue_id="receiving-queue")
response = await Queue.ref("receiving-queue").dequeue(context)
await firstTopic.subscribe(context, {
queueId: "receiving-queue",
});
const response = await Queue.ref("receiving-queue").dequeue(context);
Bulk Publish
Publish a list of messages, formatted as Items.
This examples demonstrates how to store different types of data in Value, but
in practice, you will want to use a uniform structure for all Values
published to a Topic.
- Python
- TypeScript
from reboot.protobuf import from_bool, from_dict, from_int, from_list, from_str
from reboot.std.collections.queue.v1.queue import Item
await first_topic.publish(
context,
items=[
Item(value=from_bool(True)),
Item(value=from_int(3)),
Item(value=from_str("apple")),
Item(value=from_list(["a", "b", "c"])),
Item(value=from_dict({"details": "details-go-here"})),
],
)
await second_topic.publish(
context,
items=[
Item(bytes=b"some-bytes"),
Item(bytes=b"some-more-bytes"),
],
)
await third_topic.publish(
context,
items=[
Item(any=<Any>),
Item(any=<Any>),
],
)
import { Value } from "@bufbuild/protobuf";
import { Item } from "@reboot-dev/reboot-std/collections/queue/v1";
await firstTopic.publish(context, {
items: [
{ value: Value.fromJson(null) },
{ value: Value.fromJson(true) },
{ value: Value.fromJson(3) },
{ value: Value.fromJson("apple") },
{ value: Value.fromJson(["a", "b", "c"]) },
{ value: Value.fromJson({ details: "details-go-here" }) },
],
});
await secondTopic.publish(context, {
items: [
{ bytes: new TextEncoder().encode("some-bytes") },
{ bytes: new TextEncoder().encode("some-more-bytes") },
],
});
await thirdTopic.publish(context, {
items: [
{ any: <Any> },
{ any: <Any> },
],
});