Skip to content

Handler

WorkflowHandler #

Bases: Awaitable[RunResultT]

Stable interface for communicating with a running workflow. Is awaitable and streamable, and supports things like cancellation.

Source code in workflows/handler.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
class WorkflowHandler(Awaitable[RunResultT]):
    """
    Stable interface for communicating with a running workflow. Is awaitable and streamable, and supports things like cancellation.
    """

    _ctx: Context

    async def _await_result(self) -> RunResultT:
        stop_event = await self.stop_event_result()
        return stop_event.result if type(stop_event) is StopEvent else stop_event

    def __await__(self) -> Generator[Any, Any, RunResultT]:
        return self._await_result().__await__()

    def __init__(
        self,
        workflow: "Workflow",
        external_adapter: ExternalRunAdapter,
        ctx: "Context[Any] | None" = None,
    ) -> None:
        from .context import Context

        self._workflow = workflow
        self._external_adapter = external_adapter
        # TODO(v3): Remove ctx parameter. The handler will just be the external face.
        self._ctx = (
            ctx
            if ctx is not None
            else Context._create_external(
                workflow=workflow, external_adapter=external_adapter
            )
        )
        self.run_id = external_adapter.run_id
        self._all_events_consumed = False
        self._result: StopEvent | None = None
        self._result_exception: BaseException | None = None
        self._result_task = asyncio.create_task(self._wait_for_result())
        self._result_task.add_done_callback(self._handle_result_task_done)

    async def _wait_for_result(self) -> StopEvent:
        result = await self._external_adapter.get_result()
        self._result = result
        return result

    def _handle_result_task_done(self, task: asyncio.Task[StopEvent]) -> None:
        if task.cancelled():
            return
        try:
            exc = task.exception()
        except asyncio.CancelledError:
            return
        if exc is None:
            return
        self._result_exception = exc
        if isinstance(exc, WorkflowCancelledByUser) and self._result is None:
            # Preserve cancellation in handler state without changing await semantics.
            self._result = WorkflowCancelledEvent()

    @property
    def ctx(self) -> Context:
        """The workflow [Context][workflows.context.context.Context] for this run."""
        return self._ctx

    def get_stop_event(self) -> StopEvent | None:
        """The stop event for this run. Always defined once the future is done. In a future major release, this will be removed, and the result will be the stop event itself."""
        return self._result

    async def stop_event_result(self) -> StopEvent:
        """Get the stop event for this run. Always defined once the future is done. In a future major release, this will be removed, and the result will be the stop event itself."""
        return await self._result_task

    def __str__(self) -> str:
        return f"WorkflowHandler(workflow={self._workflow.workflow_name}, run_id={self.run_id}, result={self._result})"

    def is_done(self) -> bool:
        """Return True when the workflow has completed."""
        return self._result_task.done()

    async def stream_events(
        self, expose_internal: bool = False
    ) -> AsyncGenerator[Event, None]:
        """
        Stream events from the workflow execution as they occur.

        This method provides real-time access to events generated during workflow
        execution, allowing for monitoring and processing of intermediate results.
        Events are yielded in the order they are generated by the workflow.

        The stream includes all events written to the context's streaming queue,
        and terminates when a [StopEvent][workflows.events.StopEvent] is
        encountered, indicating the workflow has completed.

        Args:
            expose_internal (bool): Whether to expose internal events.

        Returns:
            AsyncGenerator[Event, None]: An async generator that yields Event objects
                as they are produced by the workflow.

        Raises:
            ValueError: If the context is not set on the handler.
            WorkflowRuntimeError: If all events have already been consumed by a
                previous call to `stream_events()` on the same handler instance.

        Examples:
            ```python
            handler = workflow.run()

            # Stream and process events in real-time
            async for event in handler.stream_events():
                if isinstance(event, StopEvent):
                    print(f"Workflow completed with result: {event.result}")
                else:
                    print(f"Received event: {event}")

            # Get final result
            result = await handler
            ```

        Note:
            Events can only be streamed once per handler instance. Subsequent
            calls to `stream_events()` will raise a WorkflowRuntimeError.
        """

        # Check if we already consumed all the streamed events
        if self._all_events_consumed:
            msg = "All the streamed events have already been consumed."
            raise WorkflowRuntimeError(msg)

        async for ev in self.ctx.stream_events():
            if isinstance(ev, InternalDispatchEvent) and not expose_internal:
                continue
            yield ev

            if isinstance(ev, StopEvent):
                self._all_events_consumed = True
                break

    def done(self) -> bool:
        """Return True when the workflow has completed."""
        _warn_done_deprecated()
        return self._result_task.done()

    def cancel(self) -> None:
        """Cancel the running workflow."""
        _warn_cancel_deprecated()
        shim = as_v2_runtime_compatibility_shim(self._external_adapter)
        if shim is None:
            raise NotImplementedError(
                "Hard cancel is not supported by this runtime. "
                "Use await handler.cancel_run() for graceful cancellation."
            )
        shim.abort()
        self._result_task.cancel()

    def exception(self) -> BaseException | None:
        """Get the exception for this run. Always defined once the future is done."""
        _warn_exception_deprecated()
        try:
            return self._result_task.exception()
        except asyncio.CancelledError:
            return None

    def cancelled(self) -> bool:
        """Return True when the underlying workflow has been cancelled."""
        _warn_cancelled_deprecated()
        if self._result_task.cancelled():
            return True
        exc = self.exception()
        if exc is not None and isinstance(exc, WorkflowCancelledByUser):
            return True
        stop_event = self.get_stop_event()
        if stop_event is not None and isinstance(stop_event, WorkflowCancelledEvent):
            return True
        return False

    async def cancel_run(self, *, timeout: float = 5.0) -> None:
        """Cancel the running workflow.

        Signals the underlying context to raise
        [WorkflowCancelledByUser][workflows.errors.WorkflowCancelledByUser],
        which will be caught by the workflow and gracefully end the run.

        Examples:
            ```python
            handler = workflow.run()
            await handler.cancel_run()
            ```
        """
        try:
            await self._external_adapter.cancel()
        except Exception:
            pass
        try:
            await asyncio.wait_for(self._result_task, timeout=timeout)
        except asyncio.TimeoutError:
            pass
        except asyncio.CancelledError:
            pass
        except Exception:
            pass

    async def send_event(self, event: Event, step: str | None = None) -> None:
        """Send an event into the workflow.

        Args:
            event: The event to send into the workflow.
            step: Optional step name to target. If None, broadcasts to all.
        """
        self.ctx.send_event(event, step)

ctx property #

ctx: Context

The workflow Context for this run.

get_stop_event #

get_stop_event() -> StopEvent | None

The stop event for this run. Always defined once the future is done. In a future major release, this will be removed, and the result will be the stop event itself.

Source code in workflows/handler.py
89
90
91
def get_stop_event(self) -> StopEvent | None:
    """The stop event for this run. Always defined once the future is done. In a future major release, this will be removed, and the result will be the stop event itself."""
    return self._result

stop_event_result async #

stop_event_result() -> StopEvent

Get the stop event for this run. Always defined once the future is done. In a future major release, this will be removed, and the result will be the stop event itself.

Source code in workflows/handler.py
93
94
95
async def stop_event_result(self) -> StopEvent:
    """Get the stop event for this run. Always defined once the future is done. In a future major release, this will be removed, and the result will be the stop event itself."""
    return await self._result_task

is_done #

is_done() -> bool

Return True when the workflow has completed.

Source code in workflows/handler.py
100
101
102
def is_done(self) -> bool:
    """Return True when the workflow has completed."""
    return self._result_task.done()

stream_events async #

stream_events(expose_internal: bool = False) -> AsyncGenerator[Event, None]

Stream events from the workflow execution as they occur.

This method provides real-time access to events generated during workflow execution, allowing for monitoring and processing of intermediate results. Events are yielded in the order they are generated by the workflow.

The stream includes all events written to the context's streaming queue, and terminates when a StopEvent is encountered, indicating the workflow has completed.

Parameters:

Name Type Description Default
expose_internal bool

Whether to expose internal events.

False

Returns:

Type Description
AsyncGenerator[Event, None]

AsyncGenerator[Event, None]: An async generator that yields Event objects as they are produced by the workflow.

Raises:

Type Description
ValueError

If the context is not set on the handler.

WorkflowRuntimeError

If all events have already been consumed by a previous call to stream_events() on the same handler instance.

Examples:

handler = workflow.run()

# Stream and process events in real-time
async for event in handler.stream_events():
    if isinstance(event, StopEvent):
        print(f"Workflow completed with result: {event.result}")
    else:
        print(f"Received event: {event}")

# Get final result
result = await handler
Note

Events can only be streamed once per handler instance. Subsequent calls to stream_events() will raise a WorkflowRuntimeError.

Source code in workflows/handler.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
async def stream_events(
    self, expose_internal: bool = False
) -> AsyncGenerator[Event, None]:
    """
    Stream events from the workflow execution as they occur.

    This method provides real-time access to events generated during workflow
    execution, allowing for monitoring and processing of intermediate results.
    Events are yielded in the order they are generated by the workflow.

    The stream includes all events written to the context's streaming queue,
    and terminates when a [StopEvent][workflows.events.StopEvent] is
    encountered, indicating the workflow has completed.

    Args:
        expose_internal (bool): Whether to expose internal events.

    Returns:
        AsyncGenerator[Event, None]: An async generator that yields Event objects
            as they are produced by the workflow.

    Raises:
        ValueError: If the context is not set on the handler.
        WorkflowRuntimeError: If all events have already been consumed by a
            previous call to `stream_events()` on the same handler instance.

    Examples:
        ```python
        handler = workflow.run()

        # Stream and process events in real-time
        async for event in handler.stream_events():
            if isinstance(event, StopEvent):
                print(f"Workflow completed with result: {event.result}")
            else:
                print(f"Received event: {event}")

        # Get final result
        result = await handler
        ```

    Note:
        Events can only be streamed once per handler instance. Subsequent
        calls to `stream_events()` will raise a WorkflowRuntimeError.
    """

    # Check if we already consumed all the streamed events
    if self._all_events_consumed:
        msg = "All the streamed events have already been consumed."
        raise WorkflowRuntimeError(msg)

    async for ev in self.ctx.stream_events():
        if isinstance(ev, InternalDispatchEvent) and not expose_internal:
            continue
        yield ev

        if isinstance(ev, StopEvent):
            self._all_events_consumed = True
            break

done #

done() -> bool

Return True when the workflow has completed.

Source code in workflows/handler.py
164
165
166
167
def done(self) -> bool:
    """Return True when the workflow has completed."""
    _warn_done_deprecated()
    return self._result_task.done()

cancel #

cancel() -> None

Cancel the running workflow.

Source code in workflows/handler.py
169
170
171
172
173
174
175
176
177
178
179
def cancel(self) -> None:
    """Cancel the running workflow."""
    _warn_cancel_deprecated()
    shim = as_v2_runtime_compatibility_shim(self._external_adapter)
    if shim is None:
        raise NotImplementedError(
            "Hard cancel is not supported by this runtime. "
            "Use await handler.cancel_run() for graceful cancellation."
        )
    shim.abort()
    self._result_task.cancel()

exception #

exception() -> BaseException | None

Get the exception for this run. Always defined once the future is done.

Source code in workflows/handler.py
181
182
183
184
185
186
187
def exception(self) -> BaseException | None:
    """Get the exception for this run. Always defined once the future is done."""
    _warn_exception_deprecated()
    try:
        return self._result_task.exception()
    except asyncio.CancelledError:
        return None

cancelled #

cancelled() -> bool

Return True when the underlying workflow has been cancelled.

Source code in workflows/handler.py
189
190
191
192
193
194
195
196
197
198
199
200
def cancelled(self) -> bool:
    """Return True when the underlying workflow has been cancelled."""
    _warn_cancelled_deprecated()
    if self._result_task.cancelled():
        return True
    exc = self.exception()
    if exc is not None and isinstance(exc, WorkflowCancelledByUser):
        return True
    stop_event = self.get_stop_event()
    if stop_event is not None and isinstance(stop_event, WorkflowCancelledEvent):
        return True
    return False

cancel_run async #

cancel_run(*, timeout: float = 5.0) -> None

Cancel the running workflow.

Signals the underlying context to raise WorkflowCancelledByUser, which will be caught by the workflow and gracefully end the run.

Examples:

handler = workflow.run()
await handler.cancel_run()
Source code in workflows/handler.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
async def cancel_run(self, *, timeout: float = 5.0) -> None:
    """Cancel the running workflow.

    Signals the underlying context to raise
    [WorkflowCancelledByUser][workflows.errors.WorkflowCancelledByUser],
    which will be caught by the workflow and gracefully end the run.

    Examples:
        ```python
        handler = workflow.run()
        await handler.cancel_run()
        ```
    """
    try:
        await self._external_adapter.cancel()
    except Exception:
        pass
    try:
        await asyncio.wait_for(self._result_task, timeout=timeout)
    except asyncio.TimeoutError:
        pass
    except asyncio.CancelledError:
        pass
    except Exception:
        pass

send_event async #

send_event(event: Event, step: str | None = None) -> None

Send an event into the workflow.

Parameters:

Name Type Description Default
event Event

The event to send into the workflow.

required
step str | None

Optional step name to target. If None, broadcasts to all.

None
Source code in workflows/handler.py
228
229
230
231
232
233
234
235
async def send_event(self, event: Event, step: str | None = None) -> None:
    """Send an event into the workflow.

    Args:
        event: The event to send into the workflow.
        step: Optional step name to target. If None, broadcasts to all.
    """
    self.ctx.send_event(event, step)