Workflow Streams - Python SDK
Workflow Streams is a Temporal Python SDK contrib library that gives a Workflow a durable, offset-addressed event channel built on Temporal's basic message primitives: Signals, Updates, and Queries.
It batches publishes to amortize per-Signal cost, deduplicates batches for exactly-once delivery to the log, supports topic filtering, and carries stream state across Continue-As-New for long-running runs.
Use Workflow Streams when you want outside observers to follow the progress of a Workflow and its Activities: updating a UI as an AI agent works, surfacing status from a payment or order pipeline, reporting intermediate results from a data job. It is not suited to ultra-low-latency cases like real-time voice, and it targets modest fan-out: tens of publishers and subscribers per Workflow, not thousands.
The temporalio.contrib.workflow_streams module is currently in
Public Preview. Refer to the
Temporal product release stages guide for more information.
Cross-language client support is on the roadmap. Only the Python client is available today.
The API may change before general availability.
Looking for...
- Runnable end-to-end samples (basic publish/subscribe, reconnecting subscriber, external publisher, bounded log): Workflow Streams samples.
- A complete LLM streaming example on this page (Activity publishes deltas, terminal consumer that resets on retry): Stream LLM output.
- Delivery guarantees, ordering, and retry semantics: Delivery semantics.
- History-size cost and tuning: Architecture.
- Long-running streams that need Continue-As-New: Continue-As-New.
Concepts
A WorkflowStream is a many-to-many event channel hosted inside a Workflow:
- The streaming Workflow hosts the event log and ensures durability. Constructing a
WorkflowStreaminstance from@workflow.initregisters the handlers that accept publishes and serve subscribers. - Publishers append events. The Workflow itself publishes via its in-memory
WorkflowStream. Activities and external clients publish viaWorkflowStreamClient, which batches and ships events to the Workflow. - Subscribers consume events. They call
WorkflowStreamClient.subscribe(...)and iterate the returned async iterator.
Events are organized by topic, a string label set when publishing. Subscribers can filter to one or more topics or consume all of them. Topics are implicit; publishing to a topic creates it.
The event log lives inside the Workflow, so the Workflow is the single source of truth. Any process that bridges events to the outside world (an SSE proxy serving a browser, a forwarding Activity) can stay stateless and resume by replaying from a stored offset.
How it works at a glance
Temporal's standard message-passing primitives carry the work. No new transport is involved:
- Publishers send Signals. A
WorkflowStreamClientbatches publishes (default every 2 seconds) and ships each batch as a single Signal to the Workflow. - Subscribers send Updates.
subscribe()issues a long-poll Update that returns when new entries are available past the requested offset. - Position is queryable. A Query exposes the current head offset, useful as a snapshot of how much has been written.
The Workflow keeps an in-memory append-only log of (topic, data) entries, each with a monotonically increasing offset. Subscribers maintain their own cursor and receive the next range past that cursor on every poll. See Architecture for batching, dedup, history-size implications, and tuning.
When events originate in an Activity, for example an Activity streaming output from an LLM, have the Activity publish to the stream directly. The Workflow acts as a conduit: it hosts the stream, processes only the Activity's return value, and emits its own lifecycle events, but does not read the stream itself. This keeps the Workflow's own state independent of partial output from Activity attempts that ended up retrying, since the Workflow only sees the successful attempt's return. See Delivery semantics for the precise guarantees.
Where the stream lives
A WorkflowStream is hosted inside a Workflow, so the first design choice is whether one Workflow handles both the work and the stream, or whether a separate Workflow exists only to host the stream. The choice is mostly about lifecycle.
Host the stream on the Workflow that does the work when the events come from what that Workflow is already orchestrating: an agent run, an order pipeline, a chat session. The stream's lifecycle aligns with the run, starting when the run starts and ending when it returns. The Workflow Id you use to start the work is the same one subscribers attach to. This is the common shape for AI agents and most progress-streaming cases, where streaming is just one more thing the Workflow does as part of its job.
Use a dedicated Workflow for the stream alone when the stream should outlive any single producer, accept fan-in from multiple unrelated sources, or be subscribable before any work has started. Producers publish from outside the stream Workflow (Activities of other Workflows, or external WorkflowStreamClient instances). The trade-off is explicit lifecycle management: a dedicated stream Workflow does not terminate on its own, so you need a signal-driven shutdown or a Continue-As-New strategy.
Whichever shape you pick, the Workflow Id is the address subscribers use to attach. Multiple subscribers can attach to the same Id concurrently, which is the normal case for a UI with multiple browser tabs. Use distinct Workflow Ids for unrelated streams rather than packing them into one Workflow.
Enable streaming on a Workflow
The library ships with the Temporal Python SDK; import it from temporalio.contrib.workflow_streams. Enable streaming by constructing a WorkflowStream from your Workflow's @workflow.init method. (If you've only seen @workflow.run before, think of @workflow.init as the Workflow's constructor: it runs once when the Workflow is created, before any Signals or Updates can be delivered.) Construction must happen there because the stream's handlers have to be registered before the first publish Signal arrives; doing it from @workflow.run raises RuntimeError and would miss any publishes that arrived before the run body started executing.
from dataclasses import dataclass
from temporalio import workflow
from temporalio.contrib.workflow_streams import WorkflowStream
@dataclass
class OrderInput:
order_id: str
@workflow.defn
class OrderWorkflow:
@workflow.init
def __init__(self, input: OrderInput) -> None:
self.stream = WorkflowStream()
Constructing WorkflowStream creates the in-memory event log and dynamically registers the publish Signal, subscribe Update, and offset Query handlers on the current Workflow. Constructing more than one stream on the same Workflow also raises RuntimeError.
If your Workflow uses Continue-As-New, see Continue-As-New below for how to carry stream state across runs so subscribers see no gap.
Publish from a Workflow
Bind a topic name to its event type once via self.stream.topic("name", type=Type), then call publish() on the returned handle to append events. The handle records the per-stream binding from topic name to value type so call sites don't have to repeat the type on every publish, and so subscribers reading the same handle decode to the matching type.
from dataclasses import dataclass
@dataclass
class StatusEvent:
state: str
progress: int = 0
detail: str = ""
@workflow.defn
class OrderWorkflow:
@workflow.init
def __init__(self, input: OrderInput) -> None:
self.stream = WorkflowStream()
self.status = self.stream.topic("status", type=StatusEvent)
@workflow.run
async def run(self, input: OrderInput) -> None:
self.status.publish(StatusEvent(state="validating", detail="checking inventory"))
await validate_order(input.order_id)
self.status.publish(StatusEvent(state="charging", progress=33, detail="authorizing payment"))
await charge_payment(input.order_id)
self.status.publish(StatusEvent(state="shipping", progress=66, detail="dispatching to warehouse"))
await dispatch_order(input.order_id)
self.status.publish(StatusEvent(state="completed", progress=100))
publish() runs the payload converter to encode each value. The codec chain (encryption, compression, and so on) runs once on the Signal or Update envelope that carries the batch, never per item, so encryption and compression are applied exactly once each direction.
The type= argument is optional and defaults to Any. Pass it when you want the binding recorded so re-binding the same name to an unequal type raises, or so subscribers can pick up the type from the same handle.
Publish from a client
Any process that has a Temporal Client and the target Workflow Id can publish to that Workflow's stream by constructing a WorkflowStreamClient. This is the general pattern and covers HTTP backends, starters, one-off scripts, other Workflows' Activities, and standalone Activities. Construct one with WorkflowStreamClient.create(client, workflow_id), then use it the same way you would the Workflow-side handle: bind a topic, publish through it, and let the async context manager flush on exit.
from datetime import timedelta
from temporalio.client import Client
from temporalio.contrib.workflow_streams import WorkflowStreamClient
async def publish_status(workflow_id: str) -> None:
temporal_client = await Client.connect("localhost:7233")
stream_client = WorkflowStreamClient.create(
temporal_client,
workflow_id=workflow_id,
batch_interval=timedelta(milliseconds=200),
)
async with stream_client:
status = stream_client.topic("status", type=StatusEvent)
status.publish(StatusEvent(state="started"))
...
# Buffer is flushed on context manager exit.
Inside an Activity scheduled by a Workflow, WorkflowStreamClient.from_within_activity() is a convenience that infers the Temporal Client and the parent Workflow Id from the Activity context, so you don't have to thread them through the Activity's input:
from temporalio import activity
from temporalio.contrib.workflow_streams import WorkflowStreamClient
@activity.defn
async def stream_deltas(order_id: str) -> None:
client = WorkflowStreamClient.from_within_activity()
async with client:
deltas = client.topic("delta", type=Delta)
for delta in generate_deltas(order_id):
deltas.publish(delta)
activity.heartbeat()
# Buffer is flushed on context manager exit.
For a standalone Activity (one started directly via Client.start_activity rather than from a Workflow), there is no parent Workflow context to infer, so from_within_activity() raises. Fall back to the general pattern with activity.client() and the target Workflow Id threaded through the Activity's input.
Two operations give the application explicit control over when batches ship: force_flush=True on a publish for latency, and await client.flush() for confirmation that prior publications have landed.
Pass force_flush=True on a publish to wake the background flusher so the current buffer ships without waiting for the next interval. The flusher only runs while the client is entered (async with client); outside that, force_flush=True queues the wake event but nothing ships until you enter the context or call await client.flush(). The call returns immediately after appending to the buffer and signaling the flusher; it does not wait for delivery to the Workflow or to subscribers:
deltas.publish(delta, force_flush=True)
Use it for latency-sensitive events: the first delta of a response so the user sees something fast, or punctuated events like RETRY and STATUS_CHANGE. See Tuning for the trade-off against history pressure.
Use await client.flush() when you need a mid-stream barrier. Successful completion of the flush is proof that the Temporal server has received all prior publications, so subsequent work that depends on those events being durable can proceed. The client stays open for further publishing afterward. Exiting async with client already flushes on its way out, so the explicit call is only for barriers in the middle:
async with client:
deltas = client.topic("delta", type=Delta)
for delta in first_phase():
deltas.publish(delta)
await client.flush()
checkpoint_id = await record_phase_one_complete() # only safe once phase-one events are durable in the Workflow log
for delta in second_phase(checkpoint_id):
deltas.publish(delta)
publish() is non-blocking and applies no backpressure. From an Activity or other client, it appends to the client's in-memory buffer and returns; from inside a Workflow, it appends synchronously to the in-memory log (no buffer, nothing to flush). Subscribers pull from the Workflow's log on their own schedule, so a slow subscriber does not slow down publishers. If a publisher emits faster than batches can ship to the server, the buffer grows: the process uses more memory, the stream falls further behind real time, and at the limit Signals cannot keep up at all.
If your application needs to bound this (to cap memory, to keep the stream close to real time, or to apply a policy when the publisher overruns the network), apply that policy upstream of publish(). The choice (block, drop, error, sample) is application-specific, and Workflow Streams does not pick one for you.
Subscribe
Subscribing uses the same client construction as publishing: WorkflowStreamClient.create(client, workflow_id) from any process that has a Temporal Client, or from_within_activity() inside an Activity. Subscribing from an Activity is less common in practice, so the general client case is the primary example below.
Subscribing from inside the host Workflow is intentionally unsupported. The Workflow only sees the successful return value of each Activity; the stream may carry partial output from attempts that failed and were retried. Letting the Workflow read its own stream would mix those two views and break the conduit role the Workflow is meant to play. See How it works at a glance for the framing.
Once you have a client, iterate a topic handle's subscribe(), the counterpart to publish(). The handle's bound type drives decoding, so each item.data arrives as T via the client's payload converter. The codec chain is applied once at the Update envelope, not per item.
from temporalio.client import Client
from temporalio.contrib.workflow_streams import WorkflowStreamClient
async def watch_order(order_id: str) -> None:
temporal_client = await Client.connect("localhost:7233")
stream = WorkflowStreamClient.create(temporal_client, workflow_id=order_id)
status = stream.topic("status", type=StatusEvent)
async for item in status.subscribe():
evt = item.data
print(f"[{evt.progress:3d}%] {evt.state}: {evt.detail}")
if evt.state == "completed":
break
The iterator handles re-polling, pagination when a poll response hits the ~1 MB cap, and Workflow-side log truncation transparently. Callers don't need to wrap the iterator for the common cases. Two edge cases are worth knowing: an RPC timeout where Continue-As-New cannot be followed ends the iterator silently (no exception raised), and a validator rejection during a CAN handoff can surface as a WorkflowUpdateFailedError. See the API reference for details.
Heterogeneous topics
A topic handle binds one name to one type, so it only fits a single-type subscription. To consume multiple topics whose payload types differ, call client.subscribe() directly with a list of names (or subscribe([]) for every topic) and pass result_type=temporalio.common.RawValue so each item arrives as the underlying Payload wrapped in a RawValue. Dispatch on item.topic and decode the wrapped payload with the client's payload converter:
from temporalio.common import RawValue
converter = temporal_client.data_converter.payload_converter
async for item in stream.subscribe(["status", "progress"], result_type=RawValue):
if item.topic == "status":
evt = converter.from_payload(item.data.payload, StatusEvent)
print(f"[status] {evt.state}: {evt.detail}")
elif item.topic == "progress":
evt = converter.from_payload(item.data.payload, ProgressEvent)
print(f"[progress] {evt.message}")
A single iterator over multiple topics also avoids the cancellation race that two concurrent subscribers would create. RawValue is also the right shape when you want to forward the bytes through to another system without decoding them.
Omitting result_type entirely or passing result_type=None decodes each item with the converter's default rules. For the stock JSON converter, that means a Python primitive, dict, or list. This works for fully homogeneous streams, but not for the dispatch-by-topic pattern above, where each topic has its own concrete dataclass.
Closing the stream
A subscriber's async for does not know when the publisher is done. End-of-stream is an application-level concern; Workflow Streams does not impose a marker. Without coordination, a subscriber will keep polling until the Workflow reaches a terminal state, and a Workflow that returns immediately after its last publish can lose that publish's poll round-trip in the gap.
How you close depends on what the application needs. As one example, a common pattern combines two pieces:
- An in-band terminator. The Workflow (or its Activity) publishes a sentinel event the subscriber recognizes and breaks on. In the
watch_orderexample above,StatusEvent(state="completed")is the minimal form, and the consumer'sif evt.state == "completed": breakis the matching half. Each subscription decides what its own end-of-stream marker is. - A brief overlap before the Workflow returns. A poll Update that is still in flight when the Workflow returns surfaces to the client as
AcceptedUpdateCompletedWorkflow, and no new polls can complete after that. If the Workflow returns immediately after publishing the terminator, subscribers may miss it.
There are two ways to provide that overlap.
Fixed sleep (simplest). Sleep between the terminator and the return so any in-flight poll has time to fetch the terminator before the Workflow exits:
# at the end of @workflow.run
self.status.publish(StatusEvent(state="completed", progress=100))
await workflow.sleep(timedelta(seconds=30))
return result
The sleep needs to be long enough to cover the time between when the terminator becomes visible and when the subscriber's next poll reaches the server, including any client-side cooldown and network round-trips. A few hundred milliseconds is tight under realistic conditions; thirty seconds is a generous default. The cost is small: the Workflow Run stays open for that duration but does no other work.
Acknowledgment handshake. The subscriber sends a Signal once it has the terminator; the Workflow waits up to a timeout, returning as soon as the ack arrives:
@workflow.signal
async def subscriber_acknowledged_terminator(self) -> None:
self.subscriber_done = True
@workflow.run
async def run(self, input: ChatInput) -> str:
...
try:
await workflow.wait_condition(
lambda: self.subscriber_done,
timeout=timedelta(seconds=30),
)
except TimeoutError:
pass # No subscriber attached; the run still completes cleanly.
return result
The timeout is still required because the subscriber may not be attached, or may have gone away. With the ack on top, the typical case (subscriber online) exits as soon as the subscriber confirms receipt, regardless of how long the fallback timeout is. The full pattern is wired into the Stream LLM output example below.
Inspecting terminal status. subscribe() exits cleanly when the Workflow reaches COMPLETED, FAILED, CANCELED, TERMINATED, or TIMED_OUT, but does not distinguish among them. If your application needs to know which (to display success or failure to the user, log the outcome, or decide whether to retry), call await temporal_client.get_workflow_handle(workflow_id).describe() after the loop returns to inspect the Workflow's status.
Continue-As-New
If your Workflow runs for minutes and finishes (a single chat completion, an order pipeline, a one-shot agent), you can skip this section. Continue-As-New becomes relevant for streams that run for hours or accumulate thousands of events, where you need to roll the run over to keep history bounded.
Subscribers automatically follow Continue-As-New chains, so a long-running Workflow can roll over without disrupting active consumers. Workflow Ids are stable across Continue-As-New, so the iterator simply fetches a fresh handle for the same Workflow Id and continues polling from the carried offset. CAN-following requires the client retained from WorkflowStreamClient.create() or from_within_activity(); clients constructed directly with a single handle cannot re-target the new run.
To roll a long-running streaming Workflow over without subscribers seeing a gap, carry both your application state and the stream state across the boundary. Add a WorkflowStreamState | None field to your Workflow input, pass it to the constructor, and call WorkflowStream.continue_as_new(build_args) to invoke the rollover. The helper drains waiting subscribers, waits for in-flight handlers to finish, then calls workflow.continue_as_new with the args produced by build_args(post_drain_state):
from dataclasses import dataclass, field
from temporalio import workflow
from temporalio.contrib.workflow_streams import WorkflowStream, WorkflowStreamState
@dataclass
class AppState:
items_processed: int = 0
@dataclass
class WorkflowInput:
app_state: AppState = field(default_factory=AppState)
stream_state: WorkflowStreamState | None = None
@workflow.defn
class LongRunningWorkflow:
@workflow.init
def __init__(self, input: WorkflowInput) -> None:
self.app_state = input.app_state
self.stream = WorkflowStream(prior_state=input.stream_state)
@workflow.run
async def run(self, input: WorkflowInput) -> None:
while True:
await do_one_iteration(self)
if workflow.info().is_continue_as_new_suggested():
await self.stream.continue_as_new(
lambda stream_state: [
WorkflowInput(
app_state=self.app_state,
stream_state=stream_state,
)
]
)
The | None on the stream_state field type is required: prior_state is None on a fresh start and a WorkflowStreamState instance after a rollover. Always use the concrete type, not Any. With Any, the data converter rebuilds the field as a plain dict and WorkflowStream(prior_state=...) raises AttributeError accessing .log / .base_offset / .publishers on the dict.
To pass other Continue-As-New parameters such as task_queue, retry_policy, run_timeout, use the explicit recipe instead:
self.stream.detach_pollers()
await workflow.wait_condition(workflow.all_handlers_finished)
workflow.continue_as_new(
args=[WorkflowInput(app_state=self.app_state, stream_state=self.stream.get_state())],
task_queue="other-tq",
)
The carried WorkflowStreamState includes the entire in-memory log of the previous run, so streams that carry large items can hit Temporal's per-payload size limit at the rollover. (Individual publish Signals and subscribe Update responses can also exceed the limit, but the carried state is the most acute case because it accumulates the full log window.) Offload the bytes via External Storage so each item is a small reference rather than the full payload, and combine that with truncate() to keep the carried log itself small.
Tuning
The most important question when tuning is: how often do you want to update your UI? That answer drives the trade-off between user-perceived latency and the number of history events your Workflow accumulates. The library defaults assume a slow-moving UI; LLM token streaming and other interactive cases need lower latency, which means tuning.
The trade-off is direct. Each batched publish is one Signal, and each subscriber poll is one Update. Each Signal and each Update accumulates against the Workflow's history. A more responsive UI means more messages and more history per second; messages drive workload (and on metered deployments, billing), while history accumulates against Temporal's per-run limits. For long-running streams, plan a Continue-As-New policy from the start.
Settings that matter most
batch_interval(default 2 seconds). Maximum time between automatic flushes from the client. Lower it to make the stream feel live; raise it to amortize Signal cost. For an LLM token stream feeding a chat UI, 200 ms is a good starting point: the user perceives it as live, and a 30-second response generates roughly 150 publish Signals rather than several hundred. Below 100 ms the per-Signal RPC overhead starts to dominate.
For per-publish overrides where one specific event needs lower latency than the batch interval (for example, the first delta of a response so the user sees something fast, or punctuated events like RETRY and STATUS_CHANGE), pass force_flush=True on that publish. Don't make this the default mode: per-token force_flush=True on a 500-token completion produces 500 publish Signals, which is meaningful but tractable; per-character force_flush=True is not.
Other settings
You usually do not need to touch these, but they are available when the basic settings are not enough:
max_batch_size(default unbounded). Caps the number of items per batch. With the default, onlybatch_intervalbounds batch size, so a hot publisher can accumulate enough items between intervals that the resulting Signal exceeds Temporal's per-message gRPC payload limit. Setmax_batch_sizeto bound by item count, or callforce_flush=Trueafter each logical chunk to bound by application boundaries (for example, publish per generated sentence in a TTS Activity so each Signal carries one audio chunk). For large items, offload via External Storage so each item is a small reference.poll_cooldown(subscriber-side, default 100 ms). The subscriber sleeps for this interval between polls. The cooldown is skipped only when a poll response was capped at the ~1 MB gRPC limit and more items remain (amore_readyflag in the response), so the next poll can drain the rest immediately. That path is an optimization for bursty producers; in the steady state, every poll waits the cooldown before the next. Hold a single iterator and consume from it rather than opening and closing subscriptions in a loop.max_retry_duration(default 10 minutes). How long the client retries a failed publish batch before giving up and raisingTimeoutError. Tune higher if your application can tolerate longer outages while a publisher retries through transient failures; lower if you want failures to surface quickly.publisher_ttl(default 15 minutes). How long the Workflow retains per-publisher deduplicate state. At each Continue-As-New, entries older than this are dropped. Tune higher if your publishers can be silent for extended windows.
The last two settings are related. Keep max_retry_duration < publisher_ttl so a long-running retry cannot outlast its dedup record and produce a duplicate when it finally succeeds. If you tune one, tune the other. See Delivery semantics for the full failure model.
Delivery semantics
Exactly-once at the execution layer. Each (publisher_id, sequence) batch lands in the Workflow's event log at most once, even if the publisher's underlying Signal is retried by the SDK or the network. Once an event is in the log, every subscriber that polls past its offset will see it, and deduplicate state is carried across Continue-As-New so a retried publish that arrives after a rollover still lands at most once.
Ordering. The log imposes a single total order on all events, fixed once written: an event at offset N stays at offset N on every read. Within one publisher (one WorkflowStreamClient instance, or the Workflow itself), events appear in publish order. Across concurrent publishers, the interleaving is whatever the Workflow saw when serializing inbound Signals; the order is stable once recorded but not under application control. If event A must precede event B, publish them from the same publisher.
Activity retries surface to subscribers. When an Activity that publishes events fails partway through and Temporal retries it, both attempts' events appear in the stream. Concretely: an Activity that publishes three TEXT_DELTA events and then errors, then retries and publishes its full output, will deliver three partial events followed by the complete sequence. The Workflow itself sees only the successful attempt's return value (that's what durable execution hides), but a UI subscribed to the stream will see the partial output unless it dedupes. Consumers must reset or annotate on retry events; the library does not do this automatically.
The conventional pattern is for an Activity that detects it's on a retry attempt to publish a RETRY event with force_flush=True, and for the consumer to clear or annotate prior-attempt output when it sees one. Treat the stream as an append-only log of attempts and let an idempotent consumer reducer reconcile them: overwrite on terminal events like STATUS_CHANGE or TEXT_COMPLETE, or reset an accumulator on a sentinel like AGENT_START before deltas resume. Because the Workflow processes only Activity return values rather than reading the stream itself, its own state stays independent of these retried events.
This is the price of streaming events as they happen rather than waiting for the Workflow's durable view to settle. If the library waited for a successful Activity return before surfacing anything, there would be nothing to stream.
Other failure modes. Events still in a publisher's in-memory client buffer are lost if the process crashes before they ship. Subscribers that handle an item and crash before persisting their next offset will reprocess that item on resume. Build consumer state with both in mind.
Two limits on the deduplication window are worth understanding:
-
publisher_ttl(default 15 minutes). Retention for the per-publisher deduplicate state. At each Continue-As-New, deduplicate entries whoselast_seenis older than this are dropped.last_seenis updated on each successful publish (not on each retry attempt), so a publisher that retries through a long partition without success can still age out. A publisher that returns after a longer pause may produce a duplicate. Tune upward viaWorkflowStream.continue_as_new(publisher_ttl=...)if your publishers can be silent for extended windows. -
max_retry_duration(default 10 minutes). AWorkflowStreamClientretries a failed batch for up to this long. If the duration elapses with the batch still pending (for example, during a sustained network partition), the client gives up, the pending batch is dropped, and aTimeoutErroris raised.On timeout, the dropped batch is at-most-once: it may or may not have reached the Workflow. Subsequent publishes resume cleanly with the next sequence. One operational caveat: the
TimeoutErrorraises from inside the background flusher task and terminates it. Until you callawait client.flush()or exit theasync withblock, subsequent publishes accumulate in the buffer with no flusher to ship them.
The two limits must satisfy max_retry_duration < publisher_ttl. If a publisher's retry window exceeds the dedup retention, the dedup state for that publisher can age out (at the next Continue-As-New) before the retry lands. A retry that arrives after its dedup record has been pruned is treated as a fresh publish, and if the original delivery had also succeeded, the same logical batch lands twice. The defaults (10 minutes < 15 minutes) satisfy this; if you tune one, tune the other to preserve the relationship.
Architecture
The user-facing API hides three pieces of machinery worth understanding when you tune throughput, debug delivery, or reason about history size.
Append-only log inside the Workflow. WorkflowStream keeps an in-memory list of (topic, data) entries inside the Workflow's state, each with a monotonically increasing global offset. Subscribers maintain their own cursor and on each long-poll receive the next range past it. Because the log lives in Workflow state, it is replay-safe and is carried across Continue-As-New via WorkflowStreamState.
Two mechanisms bound log growth, and they do different jobs:
truncate(up_to_offset)drops entries from the in-memory log (and therefore from the carried Continue-As-New payload). It does not remove publish Signals already recorded in history.- Continue-As-New starts a fresh history. This is the only way to shrink history; truncate alone cannot.
A subscriber whose offset falls below the new base after a truncate() is silently advanced. Internally, the poll raises ApplicationError("TruncatedOffset"); the Python client catches it and resets to offset 0, which the Workflow reads as "from the current base." The iterator does not raise, but the subscriber may re-receive items already in the log past the new base. Applications that depend on seeing every event exactly once must keep subscribers ahead of truncation or implement their own gap and re-delivery handling using item.offset.
Wire-level handlers. The three handlers registered when you construct a WorkflowStream are __temporal_workflow_stream_publish (the Signal that receives batched publishes), __temporal_workflow_stream_poll (the long-poll Update that subscribers use), and __temporal_workflow_stream_offset (the Query that reports the current head offset). Poll responses are capped at roughly 1 MB by accumulating items until the next would exceed the budget, so high-throughput producers see a steady stream of small batches. A single item that exceeds 1 MB on its own is admitted unconditionally; offload large items via External Storage so each item is a small reference.
Batching and deduplicating. Every batch carries a unique identifier (the client's id paired with a monotonic batch sequence number), so a Signal retried by the SDK or the network deduplicates to a single landing in the Workflow's event log. Deduplicate state is part of the Workflow's carried state, so the guarantee survives Continue-As-New (subject to publisher_ttl).
This dedup applies at the Signal layer, not the Activity layer. An Activity retry is a different concept from a publish retry: when Temporal retries the Activity, the retried execution constructs a new WorkflowStreamClient with its own client id, so from the stream's perspective every attempt is a fresh publisher whose batches will not deduplicate against the prior attempt's. That is why retried-attempt events appear in the stream alongside the successful attempt's output.
Gotchas
A few details worth knowing about, mostly relevant if you're writing custom message handlers or pushing the library to its limits.
WorkflowStreamClientis asyncio-only. The client buffer is mutated on the publish path and read from the flusher inside a single event loop. Don't callpublish()from a worker thread.- Custom handlers reading stream state on the first activation.
WorkflowStreamregisters its publish-Signal handler dynamically from__init__, so on the very first activation a publish Signal can be queued before class-level@workflow.signalor@workflow.updatehandlers have run. A handler that observes state set by stream initialization in that same activation can see pre-publish state. The fix is to make the handlerasync defandawaitonce before reading state.asyncio.sleep(0)is a no-op yield that suffices and adds no history events. (Don't substituteworkflow.sleep(0), which records a timer event.) Once the first activation completes, the handler is permanent and the race does not recur. - Type bindings aren't shared across publishers. Each
WorkflowStreamand eachWorkflowStreamClientrecords topic types only for its own instance. If two publishers bind the same topic name to different types, the mismatch is not caught at publish, and the subscriber gets a decode error when it processes events from the mismatched publisher.
Application: Stream LLM output
The headline use case fits the publish/subscribe shapes documented above. An Activity calls the model and publishes deltas as they arrive; the Workflow kicks off the Activity and waits for the consumer to acknowledge end-of-stream; the consumer subscribes, accumulates the deltas, and clears its accumulated state on RETRY before continuing. The shape works for a terminal client, a desktop UI, or a Server-Sent Events (SSE) endpoint forwarding to a browser; whatever holds the displayed state calls render() to display it.
If your Activity can retry, the consumer side has to account for it: a retried attempt is a fresh publisher, so its output appears in the stream alongside the previous attempt's. In the LLM streaming pattern below, that means the failed attempt's partial deltas and the retried attempt's full output both reach a subscribed UI unless the UI resets on a RETRY event. The example wires up that pattern; see Delivery semantics for the precise guarantees.
# activity.py
from openai import AsyncOpenAI
@dataclass
class TextDelta:
text: str
@dataclass
class TextComplete:
full_text: str
@activity.defn
async def stream_completion(prompt: str) -> str:
stream_client = WorkflowStreamClient.from_within_activity(
batch_interval=timedelta(milliseconds=200),
)
# Disable provider-side retries; let Temporal own retry policy at the Activity layer.
openai_client = AsyncOpenAI(max_retries=0)
async with stream_client:
deltas = stream_client.topic("delta", type=TextDelta)
complete = stream_client.topic("complete", type=TextComplete)
retry = stream_client.topic("retry", type=dict)
# Tell consumers an earlier attempt's deltas are stale.
if activity.info().attempt > 1:
retry.publish({"attempt": activity.info().attempt}, force_flush=True)
full: list[str] = []
first = True
oai_stream = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in oai_stream:
if not chunk.choices:
continue
text = chunk.choices[0].delta.content
if not text:
continue
# force_flush only on the first delta so the user sees something
# immediately; subsequent deltas batch at the 200 ms interval.
deltas.publish(TextDelta(text=text), force_flush=first)
first = False
full.append(text)
complete.publish(TextComplete(full_text="".join(full)))
return "".join(full)
# workflow.py
@workflow.defn
class ChatWorkflow:
@workflow.init
def __init__(self, input: ChatInput) -> None:
self.stream = WorkflowStream()
self.subscriber_done: bool = False
@workflow.signal
async def subscriber_acknowledged_terminator(self) -> None:
self.subscriber_done = True
@workflow.run
async def run(self, input: ChatInput) -> str:
result = await workflow.execute_activity(
stream_completion,
input.prompt,
start_to_close_timeout=timedelta(minutes=5),
)
# Wait for the subscriber to ack the terminal `complete` event.
# The timeout is a fallback for when no subscriber is attached;
# with the ack, the typical case exits as soon as the subscriber confirms.
try:
await workflow.wait_condition(
lambda: self.subscriber_done,
timeout=timedelta(seconds=30),
)
except TimeoutError:
pass # No subscriber; the run still completes cleanly.
return result
# consumer.py: accumulates the model's output and resets on retry
async def stream_chat(chat_id: str) -> str:
# Subscribe-only; no `async with` needed because the flusher only runs for publishers.
stream = WorkflowStreamClient.create(temporal_client, workflow_id=chat_id)
converter = temporal_client.data_converter.payload_converter
output: list[str] = []
def render() -> None:
... # display the accumulated output (terminal redraw, UI update, etc.)
async for item in stream.subscribe(
["delta", "retry", "complete"], result_type=RawValue
):
if item.topic == "retry":
# Earlier attempt's deltas are stale; drop what we've shown.
output.clear()
render()
elif item.topic == "delta":
delta = converter.from_payload(item.data.payload, TextDelta)
output.append(delta.text)
render()
elif item.topic == "complete":
# Acknowledge so the Workflow can return without a sleep.
await temporal_client.get_workflow_handle(chat_id).signal(
ChatWorkflow.subscriber_acknowledged_terminator
)
break
return "".join(output)
A few choices in this shape are deliberate:
- The Activity is the publisher because it owns the non-deterministic LLM call. The Workflow processes only the Activity's return value (see How it works at a glance for why the Workflow acts as a conduit rather than reading its own stream).
- The Activity publishes a
RETRYevent whenactivity.info().attempt > 1. This lets the UI respond appropriately to the failure, typically by clearing accumulated deltas before the next attempt's deltas arrive (see Delivery semantics). - Termination uses an ack handshake: the consumer signals the Workflow once it has received the
completeevent, so the Workflow can return as soon as the subscriber confirms. Thewait_conditiontimeout is the fallback when no subscriber is attached (see Closing the stream for the simpler fixed-sleep alternative). force_flush=Trueis used only on the first delta and on theRETRYsentinel, where latency matters. Subsequent deltas batch at the 200 msbatch_interval; per-deltaforce_flush=Truewould generate one Signal per token (see Tuning for the trade-off).
See also
- Workflow Streams samples (samples-python): four runnable scenarios covering basic publish/subscribe, reconnecting subscribers, external publishers, and bounded logs.
temporalio.contrib.workflow_streamsAPI reference.- Workflow message passing: Signals, Updates, and Queries that Workflow Streams is built on.
- Payload conversion: converters and codecs.