Workflows
Unlike all other method kinds, a workflow
method:
- Can only be run as a task. This means a
workflow
is always retried until completion, which we discuss in more detail in Retries and idempotency. - Is not atomic, and thus is not passed the
state
argument. We discuss below how to read and write state.
Retries and idempotency
Unlike other method kinds, a workflow
can only be run as a task,
which will be retried until completion.
Because you may be performing side-effects outside of your Reboot application, you want to retry if there is a failure so that you can converge on some desired outcome.
To make retries safe Reboot requires that all calls within a
workflow
to a writer
, transaction
, or workflow
must
explicitly specify idempotency (or lack their of) using
.idempotently()
or .unidempotently()
.
Here is an example of a workflow
that is calling one of its own
methods using .idempotently()
explicitly:
- Python
- TypeScript
await self.ref().idempotently().LoadS3Blob(context);
await this.ref().idempotently().loadS3Blob(context);
Using .unidempotently()
will ensure that the call is performed
every time.
Learn more about idempotency when calling a method here.
Reading state
Because a workflow
method is not atomic state
is not passed as an
argument to your workflow
. Instead, if you need to read state you
must fetch a current snapshot:
- Python
- TypeScript
snapshot = await self.state.read(context)
const snapshot = await this.state.read(context);
Writing state
Any state
snapshots you fetch within your workflow
are
read-only. If you want to modify state you must do so explicitly:
- Python
- TypeScript
async def increment(state):
state.iteration += 1
await self.state.idempotently(
"Finally, increment the number of iterations",
).write(
context,
increment,
)
await this.state
.idempotently("Finally, increment the number of iterations")
.write(context, async (state) => {
state.iteration++;
});
You can think of the function or lambda you pass to write()
as a
kind of inline writer
. Most importantly this means that your
function or lambda will be executed atomically with respect to any
other writer
or transaction
method.
In the same way that calling a writer
or transaction
requires
idempotency, your inline writer also requires explicit idempotency.
If you want your inline writer to execute every time, use
.unidempotently()
, e.g., in TypeScript this.state.unidempotently().write(context, ...)
.
Reboot provides some syntactic sugar for inline writers that drops the need to explicitly call .idempotently()
and lets you pass the "idempotency alias" as the first argument:
- Python
- TypeScript
await self.state.write(
"Finally, increment the number of iterations",
context,
increment,
)
await this.state.write(
"Finally, increment the number of iterations",
context,
async (state) => {
state.iteration++;
}
);
Use a self-documenting string as the idempotency alias, e.g.,
"Increment the total students count"
. This makes the code more
readable and also makes it more likely that human-written aliases
won't conflict.
Waiting until
a condition
A workflow
is the right place to write code that needs to wait
until a condition has occurred. Reboot leverages reactivity for
this, a built-in primitive of the framework, enabling you to
re-execute a block of code to check for conditions only when state
has changed.
Here's an example of an until
block that is waiting until
state.messages
is non-empty:
- Python
- TypeScript
async def have_messages():
state = await self.state.read(context)
return len(state.messages) > 0
await until("Have messages", context, have_messages)
await until("Have messages", context, async () => {
const { messages } = await this.state.read(context);
return messages.length > 0;
});
Reboot does not currently suspend your tasks while they are waiting
in an until
, but that is on the roadmap! Please reach out to us to
talk about your use case if you would like us to prioritize this
feature.
until
will re-execute reactively not only for your own state
, but
also for all other states you may call into.
until
will re-execute reactively until a non-falsy value is
returned, and then it will return that value if it is not a
boolean. For example, you can wait for a specific key to be set in a
SortedMap
and return the value:
- Python
- TypeScript
async def value_is_stored():
map = SortedMap.ref("someId")
response = await map.get(context, key="someKey")
return response.value if response.HasField("value") else False
value = await until("Value is stored", context, value_is_stored)
const value = await until(
"Value is stored",
context,
async () => {
const map = SortedMap.ref("someId");
const { value } = await map.get(context, { key: "someKey" });
return value;
},
{
stringify: (result: Uint8Array) => JSON.stringify(Array.from(result)),
parse: (value: string) => new Uint8Array(JSON.parse(value)),
}
);
An until
block memoizes its result so that you once you have
converged on some condition it won't try and converge again. The first
argument to an until block acts as an idempotency alias just like
calling .idempotently()
or state.write(...)
.
In TypeScript, Reboot will serialize and deserialize the result as
part of the memoization using JSON.stringify
and JSON.parse
, but
since these are not type safe, you must pass an object with a
validate
property as the last argument. If instead you would prefer
to serialize and deserialize a different way you can specify the
stringify
and parse
options instead of validate
.
At least or at most once
When making calls to other states within a workflow
, e.g., to a
SortedMap
, you can ensure that the call is performed once by using
.idempotently()
, as discussed above. To call outside of your
Reboot application you can use helpers that execute a block of code
"at least once" or "at most once". Here is an example of "at most
once":
- Python
- TypeScript
async def remit_to_provider():
return requests.post(
REMITTANCE_PROVIDER_URL,
json=request.toAccountDetails,
)
await at_most_once("Remit to provider”, context, remit_to_provider)
await atMostOnce("Remit to provider”, context, async () => {
return http.post(REMITTANCE_PROVIDER_URL, {
toAccountDetails: request.toAccountDetails
});
});
Using at_most_once
(Python), atMostOnce
(TypeScript) requires
careful error handling to deal with the case that a failure occured,
or an exception was raised, in the middle of execution, because it
will never be retried. This is what you have to do no matter what if
the API you are calling has no inherent notion of idempotency, but if
it does, always prefer using at_least_once
(Python), atLeastOnce
(TypeScript), and use what ever means the API has for being able to
retry safely.
You can return a result from the function or lambda you pass as an
argument just like with until
, and in TypeScript that means you'll
also either need to pass a validate
or your own stringify
and
parse
. See the until
example above for more details.
By combining these helper functions, you can create robust workflows that handle various scenarios and ensure reliable task execution.
Control loops
A workflow
can be run as a control loop by returing Loop
instead
of a response. Reboot will then "loop" your workflow and re-execute it
reliably. It will also increment context.iteration
so that you can
distinguish what iteration of your loop you are currently executing!
Finally, you can schedule the loop for some time in the future by
passing a when
to Loop
, exactly as when calling schedule()
or
spawn()
. Here's an example of looping your workflow for 5 minutes
from now.
- Python
- TypeScript
return new Loop(when=datetime.now() + timedelta(minutes=5))
return new Loop({ when: new Date(Date.now() + 5 * 60 * 1000) });