update python rollback docs: required undo, undo= param, depends/concurrent support

This commit is contained in:
dmmulroy 2026-01-09 18:08:16 -05:00
parent 2382f32b47
commit c70608ad61
No known key found for this signature in database

View file

@ -85,14 +85,22 @@ To enable automatic rollback, pass a `rollback` configuration when creating the
### step.with_rollback
* <code>step.with_rollback(name, config=None)</code> - decorator that allows you to define a step with a rollback handler.
* `name` - the name of the step.
* `config` - an optional `WorkflowStepConfig` dictionary.
* <code>step.with_rollback(name, *, undo=None, depends=None, concurrent=False, config=None, undo_config=None)</code> - decorator that allows you to define a step with a rollback handler.
* `name` - the name of the step (up to 256 chars).
* `undo` - undo handler function, or use `@decorated_fn.undo` decorator.
* `depends` - optional list of steps this depends on (DAG pattern). See [DAG Workflows](/workflows/python/dag).
* `concurrent` - run dependencies in parallel (default: `False`).
* `config` - optional `WorkflowStepConfig` dictionary for the do function.
* `undo_config` - optional `WorkflowStepConfig` dictionary for the undo function.
:::note
An undo handler is **required** for `with_rollback` steps. If no undo handler is provided via the `undo=` parameter or `@fn.undo` decorator, a `ValueError` is raised at call time. Use `step.do()` for steps that don't need rollback.
:::
### @do_fn.undo
* <code>@do_fn.undo(config=None)</code> - decorator to register an undo function for a `with_rollback` step.
* `config` - optional separate config for the undo function. If not provided, inherits from the do step's config.
* `config` - optional separate config for the undo function.
### Enabling Rollback
@ -101,51 +109,92 @@ Pass a `rollback` configuration when creating the workflow instance:
```python
instance = await self.env.MY_WORKFLOW.create(
params={"user_id": "123", "items": ["item1", "item2"]},
rollback={"continueOnError": True} # Enable auto-rollback
rollback={"continue_on_error": True} # Enable auto-rollback
)
```
* `continueOnError` - if `True`, continue executing remaining undos after a failure and raise `ExceptionGroup` at end. Default: `False`.
* `continue_on_error` - if `True`, continue executing remaining undos after a failure and raise `ExceptionGroup` at end. Default: `False`.
### Example
Two patterns are supported for attaching undo handlers:
```python
from workers import WorkflowEntrypoint
class OrderWorkflow(WorkflowEntrypoint):
async def run(self, event, step):
payload = event["payload"]
# Pattern A: Chained decorator (preferred - keeps do/undo together)
@step.with_rollback("create order")
async def create_order():
return await env.DB.orders.insert({"user_id": event["payload"]["user_id"], "items": event["payload"]["items"]})
return await self.env.DB.prepare(
"INSERT INTO orders (user_id, items) VALUES (?, ?) RETURNING *"
).bind(payload["user_id"], payload["items"]).first()
@create_order.undo
async def undo_create_order(err, order):
await env.DB.orders.delete(order["id"])
async def _(err, order):
await self.env.DB.prepare("DELETE FROM orders WHERE id = ?").bind(order["id"]).run()
order = await create_order()
@step.with_rollback("charge payment")
# Pattern B: undo= parameter (for reusable undo handlers)
async def refund_charge(err, charge):
await stripe_refund(charge["id"])
@step.with_rollback(
"charge payment",
undo=refund_charge,
config={"retries": {"limit": 3, "delay": "1 second", "backoff": "exponential"}},
undo_config={"retries": {"limit": 5, "delay": "2 seconds"}}
)
async def charge_payment():
return await env.STRIPE.charges.create(amount=order["total"], customer=event["payload"]["user_id"])
return await stripe_charge(order["total"], payload["user_id"])
@charge_payment.undo
async def undo_charge_payment(err, charge):
await env.STRIPE.refunds.create(charge=charge["id"])
charge = await charge_payment()
await charge_payment()
# Steps can depend on other rollback steps (DAG pattern)
@step.with_rollback("reserve inventory", depends=[create_order])
async def reserve_inventory(order_result):
return await inventory_reserve(order_result["items"])
# If this step throws, the undo functions above will run automatically
# (in reverse order) if the instance was created with rollback config enabled
@reserve_inventory.undo
async def _(err, reservation):
await inventory_release(reservation["id"])
await reserve_inventory()
# Non-rollbackable step (email can't be unsent) - use step.do()
@step.do("send confirmation")
async def send_confirmation():
await env.EMAIL.send(to=event["payload"]["email"], template="order-confirmed", order_id=order["id"])
await send_email(payload["user_id"], "order-confirmed", order["id"])
await send_confirmation()
return {"order_id": order["id"], "charge_id": charge["id"]}
```
# Creating the workflow instance with rollback enabled:
# instance = await env.MY_WORKFLOW.create(
# params=order_params,
# rollback={"continueOnError": False} # Stop on first undo failure
# )
Creating the workflow instance with rollback enabled:
```python
class Default(WorkerEntrypoint):
async def fetch(self, request):
# Auto-rollback enabled (stop on first undo failure)
instance = await self.env.MY_WORKFLOW.create(
id="order-456",
params={"user_id": "u2", "items": ["item1"]},
rollback={}
)
# Auto-rollback with continue-on-error
instance = await self.env.MY_WORKFLOW.create(
id="order-789",
params={"user_id": "u3", "items": ["item2"]},
rollback={"continue_on_error": True}
)
return Response.json({"id": instance.id})
```
### `event` parameter