Skip to content

Add Strands Agents plugin (contrib)#1539

Open
brianstrauch wants to merge 37 commits into
mainfrom
strands
Open

Add Strands Agents plugin (contrib)#1539
brianstrauch wants to merge 37 commits into
mainfrom
strands

Conversation

@brianstrauch
Copy link
Copy Markdown
Member

@brianstrauch brianstrauch commented May 18, 2026

Summary

  • New temporalio.contrib.strands plugin (experimental) that runs Strands Agents inside Temporal Workflows, routing model invocations, tool calls, and MCP tool calls through activities for durable execution and Temporal-managed retries/timeouts.
  • Adds TemporalModel, TemporalMCPClient, StrandsPlugin, and workflow.activity_as_tool / workflow.activity_as_hook helpers, with sandbox/import tweaks for Strands compatibility.
  • Includes README, integration tests (model, tools, MCP, hooks, interrupts, streaming, structured output), and bumps test formatting/lints repo-wide.

Test plan

  • uv run pytest tests/contrib/strands -v
  • uv run pytest (full suite)

Also bump `[tool.ruff] target-version` from py39 to py310 to match
`requires-python`; the old setting caused 0.15 to reject `match`
statements in the codebase.
Restructure the README into Quickstart + per-feature sections (Model,
Structured Output, Streaming, Tools, MCP), add an experimental warning,
installation instructions, and a link to strandsagents.com. Also
default `TemporalModel.model_factory` to `BedrockModel`, matching the
Strands `Agent` default, so the common case doesn't need a factory
lambda.
Adds activity_as_hook(activity_fn, *, extract, **options): wraps a
Temporal activity as a Strands HookCallback so I/O-doing hook callbacks
(audit logs, metrics) dispatch off the workflow. Co-locates with
activity_as_tool in a new _workflow.py.
Patch Agent.__init__ at import time to force retry_strategy=None and raise
ValueError when a strategy is supplied, so retries happen at the Temporal
activity layer (RetryPolicy on activity options) rather than blocking inside
the activity body. Documents the behavior in README.
@brianstrauch brianstrauch requested review from a team as code owners May 18, 2026 19:03
Comment thread temporalio/contrib/strands/_model_activity.py Outdated
topic = stream.topic(input.streaming_topic)
async with stream:
async for event in _stream(self._model, input):
activity.heartbeat()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a different heartbeat strategy than taken in other integrations I believe.

Copy link
Copy Markdown
Member Author

@brianstrauch brianstrauch May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find! It's consistent with ADK but OpenAI agents uses auto-heartbeater.

After reading about this a bit, auto-heartbeater seems more correct. In the current approach, if there's a gap in the stream, the heartbeats will also have a gap and cause streaming to fail. Will fix this here, and then fix ADK as a follow-up.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked about this with @JasonSteving99, and while auto-heartbeater is probably the most well-rounded solution when we wrap generic user tools as activities, when we wrap activities that are single LLM calls, the approach of heartbeating on each chunk of a stream might be better because it can fail faster than the activity timeout if the LLM stops streaming.

Still going to default everything to use auto-heartbeater but this an interesting discussion that we could revisit.

Comment thread temporalio/contrib/strands/_plugin.py
Comment thread temporalio/contrib/strands/_plugin.py Outdated
Comment thread temporalio/contrib/strands/_plugin.py Outdated
Comment thread temporalio/contrib/strands/_plugin.py Outdated
Comment thread temporalio/contrib/strands/_plugin.py
Python < 3.11's get_type_hints leaks NotRequired[...] through TypedDict
fields, which the default JSON converter can't deserialize. Strands
Message and ToolSpec use NotRequired, so loosen the activity input
fields to Any; values pass through unchanged to Model.stream.
Switch invoke_model/_streaming to the _auto_heartbeater pattern (matches
openai_agents) so the heartbeat clock doesn't depend on event cadence
and the non-streaming activity is covered too.

Type _InvokeModelInput fields as Any: strands Message/ToolSpec use
NotRequired, which Python < 3.11's get_type_hints leaks through and the
default JSON converter then fails to deserialize. Values pass through
unchanged to Model.stream.

Drop the explicit activity name= override so the activities use their
function names (invoke_model, invoke_model_streaming), matching the
naming convention used by other contrib model activities.
Cross-module consumers can't be seen by basedpyright when the function
is underscore-prefixed, producing a false unused-function warning.
Promote the name and add a package docstring.
`pydantic` and `temporalio.contrib.strands` are already covered by the
SDK's default passthrough (via `pydantic` and `temporalio` in
`passthrough_modules_with_temporal`). Update the stale comment in
_temporal_mcp_client.py that explained the redundant entry.
Replace the singular model= / mcp_clients=[...] plugin args with name-keyed
dicts: StrandsPlugin(models={name: factory}, mcp_clients={name: transport}).
TemporalModel and TemporalMCPClient become pure workflow-side handles that
reference the worker registration by name and carry only per-call activity
options. A single pair of model activities now dispatches to any number of
backing models by resolving model_name from the activity input.
Comment thread temporalio/contrib/strands/README.md Outdated
Comment thread temporalio/contrib/strands/README.md Outdated
...

@activity.defn(name="current_time")
async def current_time_activity() -> str:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible for us to provide default versions of these since it seems like a pretty easily predictable/deterministic thin wrapper?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are unfortunately a ton of these and it would be a huge maintenance burden. This is the easiest way I could come up to mark external tools as activities, that also matches the style of user-created tools.

Callbacks run in workflow context, so they must be deterministic: no `time.time()`, `uuid.uuid4()`, or I/O — same rules as workflow code. For callbacks that need I/O (audit logging, metrics, alerting), use `workflow.activity_as_hook()` to dispatch the work as a Temporal activity:

```python
from temporalio.contrib.strands.workflow import activity_as_hook
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels like it should be hook_as_activity or activity_hook instead... just semantically? what do you think?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have activity_as_tool() and that naming is consistent with our other plugins. The idea is that you pass an activity to them and it turns them into a tool/hook.

I could also see activity_to_tool() but we'd probably want to change it consistently across our other plugins at this point.

Comment thread temporalio/contrib/strands/README.md Outdated

## Observability

`StrandsPlugin` composes cleanly with [`OpenTelemetryPlugin`](../opentelemetry) — add both to the worker to get OTel spans around the model, tool, and MCP activities the plugin schedules, plus any spans Strands itself emits inside `invoke_async`:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there ordering requirements here for which plugin needs to come first? I don't know the answer but we should find out. There's a possibility that the telemetry one needs to come first to properly observe the agent calls?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually looks like there's an even better solution, which is registering the OTel plugin on the client, and then using that client on the worker. Then there's no issue of ordering. This is what the OpenAI README suggests.

Comment thread temporalio/contrib/strands/_plugin.py
Comment thread tests/contrib/strands/test_structured_output.py
Comment thread tests/contrib/strands/test_model_streaming.py
Comment thread temporalio/contrib/strands/_plugin.py Outdated
TemporalAgent(Agent) is the primary user-facing class: it takes model="name"
to select a factory registered with StrandsPlugin(models=...), accepts the
per-call activity options, and forwards all other kwargs to Strands' Agent.
Construction-time validation of retry_strategy and overrides of
take_snapshot/load_snapshot replace the previous Agent.__init__ and snapshot
monkey-patches in StrandsPlugin. TemporalModel is no longer exported; it
remains as internal plumbing for TemporalAgent.
@brianstrauch brianstrauch requested a review from tconley1428 May 19, 2026 20:44
…ADME

Per the OpenTelemetry plugin's own guidance, plugins register on the client
so workers built from that client pick them up automatically. Update the
Observability section accordingly, plus minor wording polish in the Models
and Structured Output sections.
Force every workflow task to replay from full history so the strands tests
double as a continuous determinism check on the plugin and TemporalAgent.
All 7 tests pass under the stricter setting. Also trims a redundant
paragraph from StrandsPlugin's docstring.
@brianstrauch brianstrauch requested a review from xumaple May 19, 2026 21:18
Drop the leading underscore from populate_cache / clear_cache /
build_call_tool_activity in _temporal_mcp_client.py — basedpyright
flagged them as unused because it doesn't follow cross-module imports
for underscore-prefixed names. Add docstrings since pydocstyle now
treats them as public. Also pick up a one-line ruff format fix in
_model_activity.py.
Install a failure converter on the plugin's data converter that translates
strands InterruptException into an ApplicationError carrying the Interrupt
payload in details. TemporalActivityTool.stream() catches the matching
ApplicationError, reconstructs the Interrupt, and yields ToolInterruptEvent
so AgentResult.interrupts is populated just like the in-workflow case.

The path requires StrandsPlugin on the client (not just the worker), since
_ActivityWorker reads the data converter from client_config. README HITL
section is restructured to cover both hook-based and tool-body surfaces,
with a note on the client-attachment requirement.

New test_interrupt_exception.py exercises both surfaces end-to-end with
signal-driven resume.
…edrockModel

Forward agent invocation_state across the model activity boundary so the
worker-side model receives it via model.stream(invocation_state=...).
Entries that aren't JSON-serializable are dropped before dispatch with a
debug log naming the dropped keys.

Make model selection optional. StrandsPlugin() with no args registers a
single BedrockModel() factory under the name "bedrock" (matching Strands'
own implicit default in agent.py:221), and TemporalAgent() with no model
resolves to the sole registered factory at activity time. Multi-model
setups continue to require an explicit model= on TemporalAgent.

README quickstart shrinks accordingly: no BedrockModel import, no models=
argument on StrandsPlugin, no model= on TemporalAgent. Model= remains in
the multi-model example where it's load-bearing.
…ault

Drop the single-entry guess for TemporalAgent(model=None). Implicit
resolution is now valid only when StrandsPlugin auto-registers its own
BedrockModel default; any user-supplied models= forces every TemporalAgent
to pass model= explicitly. Track the gate via a default_name field on
ModelActivity that the plugin sets only on the auto-registered path.
Extend StrandsFailureConverter.to_failure to translate Strands' terminal
model/session exceptions into ApplicationError(non_retryable=True, type=...):
MaxTokensReachedException, ContextWindowOverflowException,
StructuredOutputException, SessionException. These deterministic failures
won't succeed on retry, so the typed annotation stops Temporal's retry
policy from churning on them. ModelThrottledException stays retryable.
@@ -0,0 +1 @@
"""Shared utilities for temporalio.contrib plugins."""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably this is just the SDK itself, I'm not sure we need a common contrib package.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you ok with moving the heartbeater into the SDK? I assume it would belong under the activity package, so it might look like @activity.auto_heartbeater

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants