remove rollbackAll/rollback_all, add config-based auto-rollback

This commit is contained in:
dmmulroy 2026-01-06 13:26:19 -05:00
parent 606d7a88f1
commit 2382f32b47
No known key found for this signature in database
2 changed files with 88 additions and 65 deletions

View file

@ -154,7 +154,9 @@ Refer to the [documentation on sleeping and retrying](/workflows/build/sleeping-
## Rollback (Saga Pattern)
Workflows supports the [saga pattern](https://microservices.io/patterns/data/saga.html) for handling distributed transactions. When a step fails, you can roll back previously completed steps by executing compensating actions (undo functions) in reverse order (LIFO - last-in, first-out).
Workflows supports the [saga pattern](https://microservices.io/patterns/data/saga.html) for handling distributed transactions. When a workflow throws an uncaught error, you can automatically roll back previously completed steps by executing compensating actions (undo functions) in reverse order (LIFO - last-in, first-out).
To enable automatic rollback, pass a `rollback` configuration when creating the workflow instance. When an uncaught error occurs, all registered undo functions will execute automatically before the workflow enters the errored state.
### step.withRollback
@ -164,25 +166,6 @@ Workflows supports the [saga pattern](https://microservices.io/patterns/data/sag
- `handler` - an object containing `do` and `undo` functions.
- `config` (optional) - configuration for the step, optionally including separate `undoConfig` for the undo function.
### step.rollbackAll
{/* prettier-ignore */}
- <code>step.rollbackAll(error?: unknown, options?: RollbackAllOptions): Promise&lt;void&gt;</code>
- `error` (optional) - the error that triggered the rollback, passed to each undo function.
- `options` (optional) - configuration options for the rollback behavior.
- Executes all registered undo functions in LIFO (last-in, first-out) order.
- Each undo is wrapped in `step.do()` for durability and retry.
- By default, stops on first undo failure. Use `continueOnError: true` to execute all undos and collect failures into an `AggregateError`.
### RollbackAllOptions
```ts
export type RollbackAllOptions = {
/** If true, continue executing remaining undos after a failure and throw AggregateError at end. Default: false */
continueOnError?: boolean;
};
```
### RollbackHandler
```ts
@ -200,6 +183,24 @@ export type RollbackStepConfig = WorkflowStepConfig & {
};
```
### RollbackConfig
```ts
export type RollbackConfig = {
/** If true, continue executing remaining undos after a failure and throw AggregateError at end. Default: false */
continueOnError?: boolean;
};
```
Pass this configuration to `workflow.create()` to enable automatic rollback on uncaught errors:
```ts
let instance = await env.MY_WORKFLOW.create({
params: { userId: "123", items: ["item1", "item2"] },
rollback: { continueOnError: true }, // Enable auto-rollback
});
```
### Example
<TypeScriptExample>
@ -207,26 +208,29 @@ export type RollbackStepConfig = WorkflowStepConfig & {
```ts
export class OrderWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
try {
const order = await step.withRollback("create order", {
do: async () => env.DB.orders.insert({ userId: event.payload.userId, items: event.payload.items }),
undo: async (err, order) => env.DB.orders.delete(order.id),
});
const order = await step.withRollback("create order", {
do: async () => env.DB.orders.insert({ userId: event.payload.userId, items: event.payload.items }),
undo: async (err, order) => env.DB.orders.delete(order.id),
});
await step.withRollback("charge payment", {
do: async () => env.STRIPE.charges.create({ amount: order.total, customer: event.payload.userId }),
undo: async (err, charge) => env.STRIPE.refunds.create({ charge: charge.id }),
});
await step.withRollback("charge payment", {
do: async () => env.STRIPE.charges.create({ amount: order.total, customer: event.payload.userId }),
undo: async (err, charge) => env.STRIPE.refunds.create({ charge: charge.id }),
});
await step.do("send confirmation", async () => {
await env.EMAIL.send({ to: event.payload.email, template: "order-confirmed", orderId: order.id });
});
} catch (e) {
await step.rollbackAll(e);
throw e;
}
// If this step throws, the undo functions above will run automatically
// (in reverse order) if the instance was created with rollback config enabled
await step.do("send confirmation", async () => {
await env.EMAIL.send({ to: event.payload.email, template: "order-confirmed", orderId: order.id });
});
}
}
// Creating the workflow instance with rollback enabled:
// const instance = await env.MY_WORKFLOW.create({
// params: orderParams,
// rollback: { continueOnError: false }, // Stop on first undo failure
// });
```
</TypeScriptExample>
@ -449,9 +453,17 @@ interface WorkflowInstanceCreateOptions {
* The event payload the Workflow instance is triggered with
*/
params?: unknown;
/**
* Enable automatic rollback on uncaught errors.
* When enabled, all registered undo functions will execute in LIFO order
* if the workflow throws an uncaught error.
*/
rollback?: RollbackConfig;
}
```
Refer to the [Rollback (Saga Pattern)](#rollback-saga-pattern) section for details on `RollbackConfig`.
## WorkflowInstance
Represents a specific instance of a Workflow, and provides methods to manage the instance.

View file

@ -79,7 +79,9 @@ async def run(self, event, step):
## Rollback (Saga Pattern)
The Python SDK supports the [saga pattern](https://microservices.io/patterns/data/saga.html) for distributed transactions using the `with_rollback` decorator. When a step fails, you can roll back previously completed steps by executing compensating actions (undo functions) in reverse order (LIFO - last-in, first-out).
The Python SDK supports the [saga pattern](https://microservices.io/patterns/data/saga.html) for distributed transactions using the `with_rollback` decorator. When a workflow throws an uncaught error, you can automatically roll back previously completed steps by executing compensating actions (undo functions) in reverse order (LIFO - last-in, first-out).
To enable automatic rollback, pass a `rollback` configuration when creating the workflow instance. When an uncaught error occurs, all registered undo functions will execute automatically before the workflow enters the errored state.
### step.with_rollback
@ -92,49 +94,58 @@ The Python SDK supports the [saga pattern](https://microservices.io/patterns/dat
* <code>@do_fn.undo(config=None)</code> - decorator to register an undo function for a `with_rollback` step.
* `config` - optional separate config for the undo function. If not provided, inherits from the do step's config.
### step.rollback_all
### Enabling Rollback
* <code>await step.rollback_all(error=None, *, continue_on_error=False)</code> - executes all registered undo functions in LIFO order.
* `error` - optional error to pass to each undo function.
* `continue_on_error` - if `True`, continue executing remaining undos after a failure and raise `ExceptionGroup` at end. Default: `False`.
* Each undo is wrapped in `step.do()` for durability and retry.
* By default, stops on first undo failure.
Pass a `rollback` configuration when creating the workflow instance:
```python
instance = await self.env.MY_WORKFLOW.create(
params={"user_id": "123", "items": ["item1", "item2"]},
rollback={"continueOnError": True} # Enable auto-rollback
)
```
* `continueOnError` - if `True`, continue executing remaining undos after a failure and raise `ExceptionGroup` at end. Default: `False`.
### Example
```python
class OrderWorkflow(WorkflowEntrypoint):
async def run(self, event, step):
try:
@step.with_rollback("create order")
async def create_order():
return await env.DB.orders.insert({"user_id": event["payload"]["user_id"], "items": event["payload"]["items"]})
@step.with_rollback("create order")
async def create_order():
return await env.DB.orders.insert({"user_id": event["payload"]["user_id"], "items": event["payload"]["items"]})
@create_order.undo
async def undo_create_order(err, order):
await env.DB.orders.delete(order["id"])
@create_order.undo
async def undo_create_order(err, order):
await env.DB.orders.delete(order["id"])
order = await create_order()
order = await create_order()
@step.with_rollback("charge payment")
async def charge_payment():
return await env.STRIPE.charges.create(amount=order["total"], customer=event["payload"]["user_id"])
@step.with_rollback("charge payment")
async def charge_payment():
return await env.STRIPE.charges.create(amount=order["total"], customer=event["payload"]["user_id"])
@charge_payment.undo
async def undo_charge_payment(err, charge):
await env.STRIPE.refunds.create(charge=charge["id"])
@charge_payment.undo
async def undo_charge_payment(err, charge):
await env.STRIPE.refunds.create(charge=charge["id"])
await charge_payment()
await charge_payment()
@step.do("send confirmation")
async def send_confirmation():
await env.EMAIL.send(to=event["payload"]["email"], template="order-confirmed", order_id=order["id"])
# If this step throws, the undo functions above will run automatically
# (in reverse order) if the instance was created with rollback config enabled
@step.do("send confirmation")
async def send_confirmation():
await env.EMAIL.send(to=event["payload"]["email"], template="order-confirmed", order_id=order["id"])
await send_confirmation()
await send_confirmation()
except Exception as e:
await step.rollback_all(e)
raise
# Creating the workflow instance with rollback enabled:
# instance = await env.MY_WORKFLOW.create(
# params=order_params,
# rollback={"continueOnError": False} # Stop on first undo failure
# )
```
### `event` parameter