PubSub
A pub/sub implementation that allows you to publish to Topic
s and subscribe
Queue
s to those Topic
s.
Each Topic
is associated with a unique ID and is created implicitly the
first time it is publish
ed to or subscribe
d to.
Messages are marshalled as Item
s to be published
to Topic
s. They can be publish
ed as a single Value
, bytes
,
or Any
object or can be publish
ed in bulk as a list of Item
s.
In practice, you will want to use just one of these formats for your messages
in a given Topic
. Read more about Item
s
to determine the best format for your data.
Subscription is Queue
based. That is, when you
subscribe
to a Topic
, you reference a Queue
by its ID.
Any messages published to that Topic
will be received on the specified Queue
and can be dequeue
d to be read.
Imports and servicers
To use the pub/sub library, import the library where you would like to use it.
To read messages from a subscribed Queue
, you will also need to import the
Queue
library.
- Python
- TypeScript
from reboot.std.collections.queue.v1.queue import Queue
from reboot.std.item.v1.item import Item
from reboot.std.pubsub.v1.pubsub import Topic
import { Queue } from "@reboot-dev/reboot-std/collections/queue/v1";
import { Topic } from "@reboot-dev/reboot-std/pubsub/v1";
Also make sure to include the pub/sub servicers when starting up your Application
.
(Note: this import is different from above.)
- Python
- TypeScript
from reboot.std.pubsub.v1 import pubsub
async def main():
application = Application(
servicers=[MyServicer] + pubsub.servicers(),
)
await application.run()
import pubsub from "@reboot-dev/reboot-std/pubsub/v1";
new Application({
servicers: [MyServicer, ...pubsub.servicers()],
initialize,
}).run();
Referencing Topics
This creates references to two Topic
s with IDs "my-first-topic"
and "my-second-topic"
.
- Python
- TypeScript
first_topic = Topic.ref("my-first-topic")
second_topic = Topic.ref("my-second-topic")
const firstTopic = Topic.ref("my-first-topic");
const secondTopic = Topic.ref("my-second-topic");
Methods
Publish
Publish a single message as a Value
, bytes
, or Any
.
- Python
- TypeScript
from reboot.protobuf import from_dict
await first_topic.publish(
context,
value=from_dict({"details": "details-go-here"}),
)
await second_topic.publish(context, bytes=b"my-bytes")
await third_topic.publish(context, any=<Any>)
import { Value } from "@bufbuild/protobuf";
await firstTopic.publish(context, {
value: Value.fromJson({ details: "details-go-here" }),
});
await secondTopic.publish(context, {
bytes: new TextEncoder().encode("my-bytes"),
});
await thirdTopic.publish(context, {
any: <Any>,
});
Subscribe
Subscribe a single Queue
as referenced by its ID to a Topic
.
Any subsequent messages publishes to the Topic
will be received by
dequeue
ing on the Queue
.
Note: Any messages published to the Topic
before the Queue
is subscribed
is not guaranteed to be received by the Queue
.
- Python
- TypeScript
await first_topic.subscribe(context, queue_id="receiving-queue")
response = await Queue.ref("receiving-queue").dequeue(context)
await firstTopic.subscribe(context, {
queueId: "receiving-queue",
});
const response = await Queue.ref("receiving-queue").dequeue(context);
Bulk Publish
Publish a list of messages, formatted as Item
s.
This examples demonstrates how to store different types of data in Value
, but
in practice, you will want to use a uniform structure for all Value
s
published to a Topic
.
- Python
- TypeScript
from reboot.protobuf import from_bool, from_dict, from_int, from_list, from_str
from reboot.std.collections.queue.v1.queue import Item
await first_topic.publish(
context,
items=[
Item(value=from_bool(True)),
Item(value=from_int(3)),
Item(value=from_str("apple")),
Item(value=from_list(["a", "b", "c"])),
Item(value=from_dict({"details": "details-go-here"})),
],
)
await second_topic.publish(
context,
items=[
Item(bytes=b"some-bytes"),
Item(bytes=b"some-more-bytes"),
],
)
await third_topic.publish(
context,
items=[
Item(any=<Any>),
Item(any=<Any>),
],
)
import { Value } from "@bufbuild/protobuf";
import { Item } from "@reboot-dev/reboot-std/collections/queue/v1";
await firstTopic.publish(context, {
items: [
{ value: Value.fromJson(null) },
{ value: Value.fromJson(true) },
{ value: Value.fromJson(3) },
{ value: Value.fromJson("apple") },
{ value: Value.fromJson(["a", "b", "c"]) },
{ value: Value.fromJson({ details: "details-go-here" }) },
],
});
await secondTopic.publish(context, {
items: [
{ bytes: new TextEncoder().encode("some-bytes") },
{ bytes: new TextEncoder().encode("some-more-bytes") },
],
});
await thirdTopic.publish(context, {
items: [
{ any: <Any> },
{ any: <Any> },
],
});