Workflows
Workflows are a flexible building block for performing long-running operations and/or operations that have side effects on state outside of Reboot.
Workflow methods have several important differences from the other kinds of methods in Reboot:
- Workflows are not atomic. For example, state updates made by a workflow may be visible to other Reboot methods before the workflow completes. Similarly, a workflow can obtain multiple read snapshots over the course of its execution. We discuss below how to read and write state.
- Workflows can only be run as tasks. This means that workflows execute asynchronously and independently from the request/response flow associated with normal Reboot method execution.
Retries and idempotency
Unlike other kinds of methods, a workflow
can only be run as a task, which will
be retried until it completes successfully (or aborts with a declared
error).
Because you may be performing side-effects outside of your Reboot application, you want to retry if there is a failure so that you can converge on some desired outcome.
To ensure retries are safe, Reboot performs all calls with
implicit idempotency (for writer
, transaction
, or workflow
) or
memoization (for reader
).
Here is an example of a workflow
that is calling one of its own
methods using implicit idempotency:
- Python
- TypeScript
await self.ref().LoadS3Blob(context);
await this.ref().loadS3Blob(context);
By default, calls are made as though you had explicitly used
.per_workflow()
(Python) / .perWorkflow()
(TypeScript), which
means the call will happen only once for each workflow.
Note that you are welcome to be explicit if you like; the implicit calls are equivalent to doing the following explicitly:
- Python
- TypeScript
await self.ref().per_workflow().LoadS3Blob(context);
await this.ref().perWorkflow().loadS3Blob(context);
If you want to make the call every time, use .always()
:
- Python
- TypeScript
await self.ref().always().LoadS3Blob(context);
await this.ref().always().loadS3Blob(context);
Learn more about idempotency when calling a method here.
Reading state
Unlike other kinds of methods, state
is not passed as an argument to
workflow
methods. Instead, if you need to read state you must fetch a current
snapshot:
- Python
- TypeScript
snapshot = await self.state.read(context)
const snapshot = await this.state.read(context);
A workflow method might fetch a snapshot of state multiple times during the course of its execution. The contents of these snapshots may differ.
Writing state
Any state
snapshots you fetch within your workflow
are
read-only. If you want to modify state, you must do so explicitly:
- Python
- TypeScript
async def increment(state):
state.iteration += 1
await self.state.write(
"Finally, increment the number of iterations",
context,
increment,
)
await this.state.write(
"Finally, increment the number of iterations",
context,
async (state) => {
state.iteration++;
}
);
You can think of the function or lambda you pass to write()
as a
kind of inline writer
. Most importantly this means that your
function or lambda will be executed atomically with respect to any
other writer
or transaction
methods.
In the same way that calling a writer
or transaction
is done so
idempotently within a workflow
, your inline writer is also executed
idempotently. The first argument to write()
acts as an idempotency
alias for that write.
Use a self-documenting string as the idempotency alias, e.g.,
"Increment the total students count"
. This makes the code more
readable and also makes it more likely that human-written aliases
won't conflict.
If you want your inline writer to execute every time, use
.always()
, e.g., in TypeScript this.state.always().write(context, ...)
.
Waiting until
a condition
A workflow
is the right place to write code that needs to wait
until a condition has occurred. Reboot leverages reactivity for
this, a built-in primitive of the framework, enabling you to
re-execute a block of code to check for conditions only when state
has changed.
Here's an example of an until
block that waits until state.messages
is
non-empty:
- Python
- TypeScript
async def have_messages():
state = await self.state.read(context)
return len(state.messages) > 0
await until("Have messages", context, have_messages)
await until("Have messages", context, async () => {
const { messages } = await this.state.read(context);
return messages.length > 0;
});
Reboot does not currently suspend your tasks while they are waiting
in an until
, but that is on the roadmap! Please reach out to us to
talk about your use case if you would like us to prioritize this
feature.
until
will re-execute reactively not only for your own state
, but
also for all other states you may call into.
until
will re-execute reactively until a non-falsy value is
returned, and then it will return that value if it is not a
boolean. For example, you can wait for a specific key to be set in a
SortedMap
and return the value:
- Python
- TypeScript
async def value_is_stored():
map = SortedMap.ref("someId")
response = await map.get(context, key="someKey")
return response.value if response.HasField("value") else False
value = await until(
"Value is stored",
context,
value_is_stored,
type=bytes,
)
const value = await until(
"Value is stored",
context,
async () => {
const map = SortedMap.ref("someId");
const { value } = await map.get(context, { key: "someKey" });
return value;
},
{
stringify: (result: Uint8Array) => JSON.stringify(Array.from(result)),
parse: (value: string) => new Uint8Array(JSON.parse(value)),
}
);
An until
block memoizes its result so that once you have
converged on some condition it won't try to converge again. The first
argument to an until
block acts as an idempotency alias, just like
calling .idempotently()
or state.write(...)
.
In Python, Reboot will serialize and deserialize the result using
pickle. However, to ensure that you don't make mistakes, you must also
pass the keyword argument type
.
In TypeScript, Reboot will serialize and deserialize the result as
part of the memoization using JSON.stringify
and JSON.parse
, but
since these are not type safe, you must pass an options object with a
validate
property as the last argument. If instead you would prefer
to serialize and deserialize a different way you can specify the
stringify
and parse
options instead of validate
.
At least or at most once
When making calls to other states within a workflow
, e.g., to a
SortedMap
, you can ensure that the call is performed once by using
.idempotently()
, as discussed above. To call outside of your
Reboot application, you can use helpers that execute a block of code
"at least once" or "at most once". Here is an example of "at most
once":
- Python
- TypeScript
async def remit_to_provider():
return requests.post(
REMITTANCE_PROVIDER_URL,
json=request.toAccountDetails,
)
await at_most_once("Remit to provider”, context, remit_to_provider)
await atMostOnce("Remit to provider”, context, async () => {
return http.post(REMITTANCE_PROVIDER_URL, {
toAccountDetails: request.toAccountDetails
});
});
If the code executed using at_most_once
(Python) or atMostOnce
(TypeScript)
raises an exception, the code will not be re-executed the next time the
workflow runs. Depending on the nature of the external service you are
interacting with, this might result in a partial state update or otherwise
problematic situation. When possible, third-party services that provide support
safe retries should be preferred. If the external API supports safe retries
(e.g., via some notion of an "idempotency key" or a similar concept),
at_least_once
(Python) or atLeastOnce
(TypeScript) can be used in Reboot.
You can return a result from the function or lambda you pass as an
argument just like with until
, and in TypeScript that means you'll
also either need to pass a validate
or your own stringify
and
parse
. See the until
example above for more details.
By combining these helper functions, you can create robust workflows that handle various scenarios and ensure reliable task execution.
Control loops
Within a workflow
you can run a control loop by calling
context.loop(...)
, which provides an asynchronous iterator that you
must explicit break or return from within:
- Python
- TypeScript
async for iteration in context.loop("Some control loop"):
...
for await (const iteration of context.loop("Some control loop")) {
...
}
Code in loops, like any other code, may experience errors or failures. In that case, as with any other failure, Reboot will restart the workflow. The loop will resume at the iteration that failed, skipping over any previously completed iterations.
Reboot performs all calls within a loop implicitly with
.per_iteration()
(Python) / .perIteration()
(TypeScript) instead
of .per_workflow()
/ .perWorkflow()
, which means the call will
happen once per iteration of the loop. If you instead want to
execute the call for every run (including retries) you can explicitly
use .always()
. Or if you want to execute a call within a loop but
only once per workflow you must explicitly write .per_workflow()
/
.perWorkflow()
.