From c70608ad6176b45f161a94d61ee609d7099dbd00 Mon Sep 17 00:00:00 2001 From: dmmulroy Date: Fri, 9 Jan 2026 18:08:16 -0500 Subject: [PATCH] update python rollback docs: required undo, undo= param, depends/concurrent support --- .../workflows/python/python-workers-api.mdx | 95 ++++++++++++++----- 1 file changed, 72 insertions(+), 23 deletions(-) diff --git a/src/content/docs/workflows/python/python-workers-api.mdx b/src/content/docs/workflows/python/python-workers-api.mdx index a3a5836162..07f84e8893 100644 --- a/src/content/docs/workflows/python/python-workers-api.mdx +++ b/src/content/docs/workflows/python/python-workers-api.mdx @@ -85,14 +85,22 @@ To enable automatic rollback, pass a `rollback` configuration when creating the ### step.with_rollback -* step.with_rollback(name, config=None) - decorator that allows you to define a step with a rollback handler. - * `name` - the name of the step. - * `config` - an optional `WorkflowStepConfig` dictionary. +* step.with_rollback(name, *, undo=None, depends=None, concurrent=False, config=None, undo_config=None) - decorator that allows you to define a step with a rollback handler. + * `name` - the name of the step (up to 256 chars). + * `undo` - undo handler function, or use `@decorated_fn.undo` decorator. + * `depends` - optional list of steps this depends on (DAG pattern). See [DAG Workflows](/workflows/python/dag). + * `concurrent` - run dependencies in parallel (default: `False`). + * `config` - optional `WorkflowStepConfig` dictionary for the do function. + * `undo_config` - optional `WorkflowStepConfig` dictionary for the undo function. + +:::note +An undo handler is **required** for `with_rollback` steps. If no undo handler is provided via the `undo=` parameter or `@fn.undo` decorator, a `ValueError` is raised at call time. Use `step.do()` for steps that don't need rollback. +::: ### @do_fn.undo * @do_fn.undo(config=None) - decorator to register an undo function for a `with_rollback` step. - * `config` - optional separate config for the undo function. If not provided, inherits from the do step's config. + * `config` - optional separate config for the undo function. ### Enabling Rollback @@ -101,51 +109,92 @@ Pass a `rollback` configuration when creating the workflow instance: ```python instance = await self.env.MY_WORKFLOW.create( params={"user_id": "123", "items": ["item1", "item2"]}, - rollback={"continueOnError": True} # Enable auto-rollback + rollback={"continue_on_error": True} # Enable auto-rollback ) ``` -* `continueOnError` - if `True`, continue executing remaining undos after a failure and raise `ExceptionGroup` at end. Default: `False`. +* `continue_on_error` - if `True`, continue executing remaining undos after a failure and raise `ExceptionGroup` at end. Default: `False`. ### Example +Two patterns are supported for attaching undo handlers: + ```python +from workers import WorkflowEntrypoint + class OrderWorkflow(WorkflowEntrypoint): async def run(self, event, step): + payload = event["payload"] + + # Pattern A: Chained decorator (preferred - keeps do/undo together) @step.with_rollback("create order") async def create_order(): - return await env.DB.orders.insert({"user_id": event["payload"]["user_id"], "items": event["payload"]["items"]}) + return await self.env.DB.prepare( + "INSERT INTO orders (user_id, items) VALUES (?, ?) RETURNING *" + ).bind(payload["user_id"], payload["items"]).first() @create_order.undo - async def undo_create_order(err, order): - await env.DB.orders.delete(order["id"]) + async def _(err, order): + await self.env.DB.prepare("DELETE FROM orders WHERE id = ?").bind(order["id"]).run() order = await create_order() - @step.with_rollback("charge payment") + # Pattern B: undo= parameter (for reusable undo handlers) + async def refund_charge(err, charge): + await stripe_refund(charge["id"]) + + @step.with_rollback( + "charge payment", + undo=refund_charge, + config={"retries": {"limit": 3, "delay": "1 second", "backoff": "exponential"}}, + undo_config={"retries": {"limit": 5, "delay": "2 seconds"}} + ) async def charge_payment(): - return await env.STRIPE.charges.create(amount=order["total"], customer=event["payload"]["user_id"]) + return await stripe_charge(order["total"], payload["user_id"]) - @charge_payment.undo - async def undo_charge_payment(err, charge): - await env.STRIPE.refunds.create(charge=charge["id"]) + charge = await charge_payment() - await charge_payment() + # Steps can depend on other rollback steps (DAG pattern) + @step.with_rollback("reserve inventory", depends=[create_order]) + async def reserve_inventory(order_result): + return await inventory_reserve(order_result["items"]) - # If this step throws, the undo functions above will run automatically - # (in reverse order) if the instance was created with rollback config enabled + @reserve_inventory.undo + async def _(err, reservation): + await inventory_release(reservation["id"]) + + await reserve_inventory() + + # Non-rollbackable step (email can't be unsent) - use step.do() @step.do("send confirmation") async def send_confirmation(): - await env.EMAIL.send(to=event["payload"]["email"], template="order-confirmed", order_id=order["id"]) + await send_email(payload["user_id"], "order-confirmed", order["id"]) await send_confirmation() + return {"order_id": order["id"], "charge_id": charge["id"]} +``` -# Creating the workflow instance with rollback enabled: -# instance = await env.MY_WORKFLOW.create( -# params=order_params, -# rollback={"continueOnError": False} # Stop on first undo failure -# ) +Creating the workflow instance with rollback enabled: + +```python +class Default(WorkerEntrypoint): + async def fetch(self, request): + # Auto-rollback enabled (stop on first undo failure) + instance = await self.env.MY_WORKFLOW.create( + id="order-456", + params={"user_id": "u2", "items": ["item1"]}, + rollback={} + ) + + # Auto-rollback with continue-on-error + instance = await self.env.MY_WORKFLOW.create( + id="order-789", + params={"user_id": "u3", "items": ["item2"]}, + rollback={"continue_on_error": True} + ) + + return Response.json({"id": instance.id}) ``` ### `event` parameter