Name | temporalio JSON |
Version |
1.15.0
JSON |
| download |
home_page | None |
Summary | Temporal.io Python SDK |
upload_time | 2025-07-29 03:44:09 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.9 |
license | None |
keywords |
temporal
workflow
|
VCS |
 |
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|

[](https://pypi.org/project/temporalio)
[](https://pypi.org/project/temporalio)
[](LICENSE)
[Temporal](https://temporal.io/) is a distributed, scalable, durable, and highly available orchestration engine used to
execute asynchronous, long-running business logic in a scalable and resilient way.
"Temporal Python SDK" is the framework for authoring workflows and activities using the Python programming language.
Also see:
* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) - Once you've tried our
[Quick Start](#quick-start), check out our guide on how to use Temporal in your Python applications, including
information around Temporal core concepts.
* [Python Code Samples](https://github.com/temporalio/samples-python)
* [API Documentation](https://python.temporal.io) - Complete Temporal Python SDK Package reference.
In addition to features common across all Temporal SDKs, the Python SDK also has the following interesting features:
**Type Safe**
This library uses the latest typing and MyPy support with generics to ensure all calls can be typed. For example,
starting a workflow with an `int` parameter when it accepts a `str` parameter would cause MyPy to fail.
**Different Activity Types**
The activity worker has been developed to work with `async def`, threaded, and multiprocess activities. Threaded activities are the initial recommendation, and further guidance can be found in [the docs](https://docs.temporal.io/develop/python/python-sdk-sync-vs-async).
**Custom `asyncio` Event Loop**
The workflow implementation basically turns `async def` functions into workflows backed by a distributed, fault-tolerant
event loop. This means task management, sleep, cancellation, etc have all been developed to seamlessly integrate with
`asyncio` concepts.
See the [blog post](https://temporal.io/blog/durable-distributed-asyncio-event-loop) introducing the Python SDK for an
informal introduction to the features and their implementation.
---
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Contents**
- [Quick Start](#quick-start)
- [Installation](#installation)
- [Implementing a Workflow](#implementing-a-workflow)
- [Running a Workflow](#running-a-workflow)
- [Next Steps](#next-steps)
- [Usage](#usage)
- [Client](#client)
- [Data Conversion](#data-conversion)
- [Pydantic Support](#pydantic-support)
- [Custom Type Data Conversion](#custom-type-data-conversion)
- [Workers](#workers)
- [Workflows](#workflows)
- [Definition](#definition)
- [Running](#running)
- [Invoking Activities](#invoking-activities)
- [Invoking Child Workflows](#invoking-child-workflows)
- [Timers](#timers)
- [Conditions](#conditions)
- [Asyncio and Determinism](#asyncio-and-determinism)
- [Asyncio Cancellation](#asyncio-cancellation)
- [Workflow Utilities](#workflow-utilities)
- [Exceptions](#exceptions)
- [Signal and update handlers](#signal-and-update-handlers)
- [External Workflows](#external-workflows)
- [Testing](#testing)
- [Automatic Time Skipping](#automatic-time-skipping)
- [Manual Time Skipping](#manual-time-skipping)
- [Mocking Activities](#mocking-activities)
- [Workflow Sandbox](#workflow-sandbox)
- [How the Sandbox Works](#how-the-sandbox-works)
- [Avoiding the Sandbox](#avoiding-the-sandbox)
- [Customizing the Sandbox](#customizing-the-sandbox)
- [Passthrough Modules](#passthrough-modules)
- [Invalid Module Members](#invalid-module-members)
- [Known Sandbox Issues](#known-sandbox-issues)
- [Global Import/Builtins](#global-importbuiltins)
- [Sandbox is not Secure](#sandbox-is-not-secure)
- [Sandbox Performance](#sandbox-performance)
- [Extending Restricted Classes](#extending-restricted-classes)
- [Certain Standard Library Calls on Restricted Objects](#certain-standard-library-calls-on-restricted-objects)
- [is_subclass of ABC-based Restricted Classes](#is_subclass-of-abc-based-restricted-classes)
- [Activities](#activities)
- [Definition](#definition-1)
- [Types of Activities](#types-of-activities)
- [Synchronous Activities](#synchronous-activities)
- [Synchronous Multithreaded Activities](#synchronous-multithreaded-activities)
- [Synchronous Multiprocess/Other Activities](#synchronous-multiprocessother-activities)
- [Asynchronous Activities](#asynchronous-activities)
- [Activity Context](#activity-context)
- [Heartbeating and Cancellation](#heartbeating-and-cancellation)
- [Worker Shutdown](#worker-shutdown)
- [Testing](#testing-1)
- [Interceptors](#interceptors)
- [Nexus](#nexus)
- [Plugins](#plugins)
- [Client Plugins](#client-plugins)
- [Worker Plugins](#worker-plugins)
- [Workflow Replay](#workflow-replay)
- [Observability](#observability)
- [Metrics](#metrics)
- [OpenTelemetry Tracing](#opentelemetry-tracing)
- [Protobuf 3.x vs 4.x](#protobuf-3x-vs-4x)
- [Known Compatibility Issues](#known-compatibility-issues)
- [gevent Patching](#gevent-patching)
- [Development](#development)
- [Building](#building)
- [Prepare](#prepare)
- [Build](#build)
- [Use](#use)
- [Local SDK development environment](#local-sdk-development-environment)
- [Testing](#testing-2)
- [Proto Generation and Testing](#proto-generation-and-testing)
- [Style](#style)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
# Quick Start
We will guide you through the Temporal basics to create a "hello, world!" script on your machine. It is not intended as
one of the ways to use Temporal, but in reality it is very simplified and decidedly not "the only way" to use Temporal.
For more information, check out the docs references in "Next Steps" below the quick start.
## Installation
Install the `temporalio` package from [PyPI](https://pypi.org/project/temporalio).
These steps can be followed to use with a virtual environment and `pip`:
* [Create a virtual environment](https://packaging.python.org/en/latest/tutorials/installing-packages/#creating-virtual-environments)
* Update `pip` - `python -m pip install -U pip`
* Needed because older versions of `pip` may not pick the right wheel
* Install Temporal SDK - `python -m pip install temporalio`
The SDK is now ready for use. To build from source, see "Building" near the end of this documentation.
**NOTE: This README is for the current branch and not necessarily what's released on `PyPI`.**
## Implementing a Workflow
Create the following in `activities.py`:
```python
from temporalio import activity
@activity.defn
def say_hello(name: str) -> str:
return f"Hello, {name}!"
```
Create the following in `workflows.py`:
```python
from datetime import timedelta
from temporalio import workflow
# Import our activity, passing it through the sandbox
with workflow.unsafe.imports_passed_through():
from .activities import say_hello
@workflow.defn
class SayHello:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
say_hello, name, schedule_to_close_timeout=timedelta(seconds=5)
)
```
Create the following in `run_worker.py`:
```python
import asyncio
import concurrent.futures
from temporalio.client import Client
from temporalio.worker import Worker
# Import the activity and workflow from our other files
from .activities import say_hello
from .workflows import SayHello
async def main():
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")
# Run the worker
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[SayHello],
activities=[say_hello],
activity_executor=activity_executor,
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
```
Assuming you have a [Temporal server running on localhost](https://docs.temporal.io/docs/server/quick-install/), this
will run the worker:
python run_worker.py
## Running a Workflow
Create the following script at `run_workflow.py`:
```python
import asyncio
from temporalio.client import Client
# Import the workflow from the previous code
from .workflows import SayHello
async def main():
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")
# Execute a workflow
result = await client.execute_workflow(SayHello.run, "my name", id="my-workflow-id", task_queue="my-task-queue")
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
```
Assuming you have `run_worker.py` running from before, this will run the workflow:
python run_workflow.py
The output will be:
Result: Hello, my-name!
## Next Steps
Temporal can be implemented in your code in many different ways, to suit your application's needs. The links below will
give you much more information about how Temporal works with Python:
* [Code Samples](https://github.com/temporalio/samples-python) - If you want to start with some code, we have provided
some pre-built samples.
* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) Our Python specific
Developer's Guide will give you much more information on how to build with Temporal in your Python applications than
our SDK README ever could (or should).
* [API Documentation](https://python.temporal.io) - Full Temporal Python SDK package documentation.
---
# Usage
From here, you will find reference documentation about specific pieces of the Temporal Python SDK that were built around
Temporal concepts. *This section is not intended as a how-to guide* -- For more how-to oriented information, check out
the links in the [Next Steps](#next-steps) section above.
### Client
A client can be created and used to start a workflow like so:
```python
from temporalio.client import Client
async def main():
# Create client connected to server at the given address and namespace
client = await Client.connect("localhost:7233", namespace="my-namespace")
# Start a workflow
handle = await client.start_workflow(MyWorkflow.run, "some arg", id="my-workflow-id", task_queue="my-task-queue")
# Wait for result
result = await handle.result()
print(f"Result: {result}")
```
Some things to note about the above code:
* A `Client` does not have an explicit "close"
* To enable TLS, the `tls` argument to `connect` can be set to `True` or a `TLSConfig` object
* A single positional argument can be passed to `start_workflow`. If there are multiple arguments, only the
non-type-safe form of `start_workflow` can be used (i.e. the one accepting a string workflow name) and it must be in
the `args` keyword argument.
* The `handle` represents the workflow that was started and can be used for more than just getting the result
* Since we are just getting the handle and waiting on the result, we could have called `client.execute_workflow` which
does the same thing
* Clients can have many more options not shown here (e.g. data converters and interceptors)
* A string can be used instead of the method reference to call a workflow by name (e.g. if defined in another language)
* Clients do not work across forks
Clients also provide a shallow copy of their config for use in making slightly different clients backed by the same
connection. For instance, given the `client` above, this is how to have a client in another namespace:
```python
config = client.config()
config["namespace"] = "my-other-namespace"
other_ns_client = Client(**config)
```
#### Data Conversion
Data converters are used to convert raw Temporal payloads to/from actual Python types. A custom data converter of type
`temporalio.converter.DataConverter` can be set via the `data_converter` parameter of the `Client` constructor. Data
converters are a combination of payload converters, payload codecs, and failure converters. Payload converters convert
Python values to/from serialized bytes. Payload codecs convert bytes to bytes (e.g. for compression or encryption).
Failure converters convert exceptions to/from serialized failures.
The default data converter supports converting multiple types including:
* `None`
* `bytes`
* `google.protobuf.message.Message` - As JSON when encoding, but has ability to decode binary proto from other languages
* Anything that can be converted to JSON including:
* Anything that [`json.dump`](https://docs.python.org/3/library/json.html#json.dump) supports natively
* [dataclasses](https://docs.python.org/3/library/dataclasses.html)
* Iterables including ones JSON dump may not support by default, e.g. `set`
* [IntEnum, StrEnum](https://docs.python.org/3/library/enum.html) based enumerates
* [UUID](https://docs.python.org/3/library/uuid.html)
* `datetime.datetime`
To use pydantic model instances, see [Pydantic Support](#pydantic-support).
`datetime.date` and `datetime.time` can only be used with the Pydantic data converter.
Although workflows, updates, signals, and queries can all be defined with multiple input parameters, users are strongly
encouraged to use a single `dataclass` or Pydantic model parameter, so that fields with defaults can be easily added
without breaking compatibility. Similar advice applies to return values.
Classes with generics may not have the generics properly resolved. The current implementation does not have generic
type resolution. Users should use concrete types.
##### Pydantic Support
To use Pydantic model instances, install Pydantic and set the Pydantic data converter when creating client instances:
```python
from temporalio.contrib.pydantic import pydantic_data_converter
client = Client(data_converter=pydantic_data_converter, ...)
```
This data converter supports conversion of all types supported by Pydantic to and from JSON.
In addition to Pydantic models, these include all `json.dump`-able types, various non-`json.dump`-able standard library
types such as dataclasses, types from the datetime module, sets, UUID, etc, and custom types composed of any of these.
Pydantic v1 is not supported by this data converter. If you are not yet able to upgrade from Pydantic v1, see
https://github.com/temporalio/samples-python/tree/main/pydantic_converter/v1 for limited v1 support.
##### Custom Type Data Conversion
For converting from JSON, the workflow/activity type hint is taken into account to convert to the proper type. Care has
been taken to support all common typings including `Optional`, `Union`, all forms of iterables and mappings, `NewType`,
etc in addition to the regular JSON values mentioned before.
Data converters contain a reference to a payload converter class that is used to convert to/from payloads/values. This
is a class and not an instance because it is instantiated on every workflow run inside the sandbox. The payload
converter is usually a `CompositePayloadConverter` which contains a multiple `EncodingPayloadConverter`s it uses to try
to serialize/deserialize payloads. Upon serialization, each `EncodingPayloadConverter` is tried until one succeeds. The
`EncodingPayloadConverter` provides an "encoding" string serialized onto the payload so that, upon deserialization, the
specific `EncodingPayloadConverter` for the given "encoding" is used.
The default data converter uses the `DefaultPayloadConverter` which is simply a `CompositePayloadConverter` with a known
set of default `EncodingPayloadConverter`s. To implement a custom encoding for a custom type, a new
`EncodingPayloadConverter` can be created for the new type. For example, to support `IPv4Address` types:
```python
class IPv4AddressEncodingPayloadConverter(EncodingPayloadConverter):
@property
def encoding(self) -> str:
return "text/ipv4-address"
def to_payload(self, value: Any) -> Optional[Payload]:
if isinstance(value, ipaddress.IPv4Address):
return Payload(
metadata={"encoding": self.encoding.encode()},
data=str(value).encode(),
)
else:
return None
def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any:
assert not type_hint or type_hint is ipaddress.IPv4Address
return ipaddress.IPv4Address(payload.data.decode())
class IPv4AddressPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
# Just add ours as first before the defaults
super().__init__(
IPv4AddressEncodingPayloadConverter(),
*DefaultPayloadConverter.default_encoding_payload_converters,
)
my_data_converter = dataclasses.replace(
DataConverter.default,
payload_converter_class=IPv4AddressPayloadConverter,
)
```
Imports are left off for brevity.
This is good for many custom types. However, sometimes you want to override the behavior of the just the existing JSON
encoding payload converter to support a new type. It is already the last encoding data converter in the list, so it's
the fall-through behavior for any otherwise unknown type. Customizing the existing JSON converter has the benefit of
making the type work in lists, unions, etc.
The `JSONPlainPayloadConverter` uses the Python [json](https://docs.python.org/3/library/json.html) library with an
advanced JSON encoder by default and a custom value conversion method to turn `json.load`ed values to their type hints.
The conversion can be customized for serialization with a custom `json.JSONEncoder` and deserialization with a custom
`JSONTypeConverter`. For example, to support `IPv4Address` types in existing JSON conversion:
```python
class IPv4AddressJSONEncoder(AdvancedJSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, ipaddress.IPv4Address):
return str(o)
return super().default(o)
class IPv4AddressJSONTypeConverter(JSONTypeConverter):
def to_typed_value(
self, hint: Type, value: Any
) -> Union[Optional[Any], _JSONTypeConverterUnhandled]:
if issubclass(hint, ipaddress.IPv4Address):
return ipaddress.IPv4Address(value)
return JSONTypeConverter.Unhandled
class IPv4AddressPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
# Replace default JSON plain with our own that has our encoder and type
# converter
json_converter = JSONPlainPayloadConverter(
encoder=IPv4AddressJSONEncoder,
custom_type_converters=[IPv4AddressJSONTypeConverter()],
)
super().__init__(
*[
c if not isinstance(c, JSONPlainPayloadConverter) else json_converter
for c in DefaultPayloadConverter.default_encoding_payload_converters
]
)
my_data_converter = dataclasses.replace(
DataConverter.default,
payload_converter_class=IPv4AddressPayloadConverter,
)
```
Now `IPv4Address` can be used in type hints including collections, optionals, etc.
### Workers
Workers host workflows and/or activities. Here's how to run a worker:
```python
import asyncio
import logging
from temporalio.client import Client
from temporalio.worker import Worker
# Import your own workflows and activities
from my_workflow_package import MyWorkflow, my_activity
async def run_worker(stop_event: asyncio.Event):
# Create client connected to server at the given address
client = await Client.connect("localhost:7233", namespace="my-namespace")
# Run the worker until the event is set
worker = Worker(client, task_queue="my-task-queue", workflows=[MyWorkflow], activities=[my_activity])
async with worker:
await stop_event.wait()
```
Some things to note about the above code:
* This creates/uses the same client that is used for starting workflows
* While this example accepts a stop event and uses `async with`, `run()` and `shutdown()` may be used instead
* Workers can have many more options not shown here (e.g. data converters and interceptors)
### Workflows
#### Definition
Workflows are defined as classes decorated with `@workflow.defn`. The method invoked for the workflow is decorated with
`@workflow.run`. Methods for signals, queries, and updates are decorated with `@workflow.signal`, `@workflow.query`
and `@workflow.update` respectively. Here's an example of a workflow:
```python
import asyncio
from datetime import timedelta
from temporalio import workflow
# Pass the activities through the sandbox
with workflow.unsafe.imports_passed_through():
from .my_activities import GreetingInfo, create_greeting_activity
@workflow.defn
class GreetingWorkflow:
def __init__(self) -> None:
self._current_greeting = "<unset>"
self._greeting_info = GreetingInfo()
self._greeting_info_update = asyncio.Event()
self._complete = asyncio.Event()
@workflow.run
async def run(self, name: str) -> str:
self._greeting_info.name = name
while True:
# Store greeting
self._current_greeting = await workflow.execute_activity(
create_greeting_activity,
self._greeting_info,
start_to_close_timeout=timedelta(seconds=5),
)
workflow.logger.debug("Greeting set to %s", self._current_greeting)
# Wait for salutation update or complete signal (this can be
# cancelled)
await asyncio.wait(
[
asyncio.create_task(self._greeting_info_update.wait()),
asyncio.create_task(self._complete.wait()),
],
return_when=asyncio.FIRST_COMPLETED,
)
if self._complete.is_set():
return self._current_greeting
self._greeting_info_update.clear()
@workflow.signal
async def update_salutation(self, salutation: str) -> None:
self._greeting_info.salutation = salutation
self._greeting_info_update.set()
@workflow.signal
async def complete_with_greeting(self) -> None:
self._complete.set()
@workflow.query
def current_greeting(self) -> str:
return self._current_greeting
@workflow.update
def set_and_get_greeting(self, greeting: str) -> str:
old = self._current_greeting
self._current_greeting = greeting
return old
```
This assumes there's an activity in `my_activities.py` like:
```python
from dataclasses import dataclass
from temporalio import workflow
@dataclass
class GreetingInfo:
salutation: str = "Hello"
name: str = "<unknown>"
@activity.defn
def create_greeting_activity(info: GreetingInfo) -> str:
return f"{info.salutation}, {info.name}!"
```
Some things to note about the above workflow code:
* Workflows run in a sandbox by default.
* Users are encouraged to define workflows in files with no side effects or other complicated code or unnecessary
imports to other third party libraries.
* Non-standard-library, non-`temporalio` imports should usually be "passed through" the sandbox. See the
[Workflow Sandbox](#workflow-sandbox) section for more details.
* This workflow continually updates the queryable current greeting when signalled and can complete with the greeting on
a different signal
* Workflows are always classes and must have a single `@workflow.run` which is an `async def` function
* Workflow code must be deterministic. This means no `set` iteration, threading, no randomness, no external calls to
processes, no network IO, and no global state mutation. All code must run in the implicit `asyncio` event loop and be
deterministic. Also see the [Asyncio and Determinism](#asyncio-and-determinism) section later.
* `@activity.defn` is explained in a later section. For normal simple string concatenation, this would just be done in
the workflow. The activity is for demonstration purposes only.
* `workflow.execute_activity(create_greeting_activity, ...` is actually a typed signature, and MyPy will fail if the
`self._greeting_info` parameter is not a `GreetingInfo`
Here are the decorators that can be applied:
* `@workflow.defn` - Defines a workflow class
* Must be defined on the class given to the worker (ignored if present on a base class)
* Can have a `name` param to customize the workflow name, otherwise it defaults to the unqualified class name
* Can have `dynamic=True` which means all otherwise unhandled workflows fall through to this. If present, cannot have
`name` argument, and run method must accept a single parameter of `Sequence[temporalio.common.RawValue]` type. The
payload of the raw value can be converted via `workflow.payload_converter().from_payload`.
* `@workflow.run` - Defines the primary workflow run method
* Must be defined on the same class as `@workflow.defn`, not a base class (but can _also_ be defined on the same
method of a base class)
* Exactly one method name must have this decorator, no more or less
* Must be defined on an `async def` method
* The method's arguments are the workflow's arguments
* The first parameter must be `self`, followed by positional arguments. Best practice is to only take a single
argument that is an object/dataclass of fields that can be added to as needed.
* `@workflow.init` - Specifies that the `__init__` method accepts the workflow's arguments.
* If present, may only be applied to the `__init__` method, the parameters of which must then be identical to those of
the `@workflow.run` method.
* The purpose of this decorator is to allow operations involving workflow arguments to be performed in the `__init__`
method, before any signal or update handler has a chance to execute.
* `@workflow.signal` - Defines a method as a signal
* Can be defined on an `async` or non-`async` method at any point in the class hierarchy, but if the decorated method
is overridden, then the override must also be decorated.
* The method's arguments are the signal's arguments.
* Return value is ignored.
* May mutate workflow state, and make calls to other workflow APIs like starting activities, etc.
* Can have a `name` param to customize the signal name, otherwise it defaults to the unqualified method name.
* Can have `dynamic=True` which means all otherwise unhandled signals fall through to this. If present, cannot have
`name` argument, and method parameters must be `self`, a string signal name, and a
`Sequence[temporalio.common.RawValue]`.
* Non-dynamic method can only have positional arguments. Best practice is to only take a single argument that is an
object/dataclass of fields that can be added to as needed.
* See [Signal and update handlers](#signal-and-update-handlers) below
* `@workflow.update` - Defines a method as an update
* Can be defined on an `async` or non-`async` method at any point in the class hierarchy, but if the decorated method
is overridden, then the override must also be decorated.
* May accept input and return a value
* The method's arguments are the update's arguments.
* May be `async` or non-`async`
* May mutate workflow state, and make calls to other workflow APIs like starting activities, etc.
* Also accepts the `name` and `dynamic` parameters like signal, with the same semantics.
* Update handlers may optionally define a validator method by decorating it with `@update_handler_method.validator`.
To reject an update before any events are written to history, throw an exception in a validator. Validators cannot
be `async`, cannot mutate workflow state, and return nothing.
* See [Signal and update handlers](#signal-and-update-handlers) below
* `@workflow.query` - Defines a method as a query
* Should return a value
* Should not be `async`
* Temporal queries should never mutate anything in the workflow or call any calls that would mutate the workflow
* Also accepts the `name` and `dynamic` parameters like signal and update, with the same semantics.
#### Running
To start a locally-defined workflow from a client, you can simply reference its method like so:
```python
from temporalio.client import Client
from my_workflow_package import GreetingWorkflow
async def create_greeting(client: Client) -> str:
# Start the workflow
handle = await client.start_workflow(GreetingWorkflow.run, "my name", id="my-workflow-id", task_queue="my-task-queue")
# Change the salutation
await handle.signal(GreetingWorkflow.update_salutation, "Aloha")
# Tell it to complete
await handle.signal(GreetingWorkflow.complete_with_greeting)
# Wait and return result
return await handle.result()
```
Some things to note about the above code:
* This uses the `GreetingWorkflow` from the previous section
* The result of calling this function is `"Aloha, my name!"`
* `id` and `task_queue` are required for running a workflow
* `client.start_workflow` is typed, so MyPy would fail if `"my name"` were something besides a string
* `handle.signal` is typed, so MyPy would fail if `"Aloha"` were something besides a string or if we provided a
parameter to the parameterless `complete_with_greeting`
* `handle.result` is typed to the workflow itself, so MyPy would fail if we said this `create_greeting` returned
something besides a string
#### Invoking Activities
* Activities are started with non-async `workflow.start_activity()` which accepts either an activity function reference
or a string name.
* A single argument to the activity is positional. Multiple arguments are not supported in the type-safe form of
start/execute activity and must be supplied via the `args` keyword argument.
* Activity options are set as keyword arguments after the activity arguments. At least one of `start_to_close_timeout`
or `schedule_to_close_timeout` must be provided.
* The result is an activity handle which is an `asyncio.Task` and supports basic task features
* An async `workflow.execute_activity()` helper is provided which takes the same arguments as
`workflow.start_activity()` and `await`s on the result. This should be used in most cases unless advanced task
capabilities are needed.
* Local activities work very similarly except the functions are `workflow.start_local_activity()` and
`workflow.execute_local_activity()`
* ⚠️Local activities are currently experimental
* Activities can be methods of a class. Invokers should use `workflow.start_activity_method()`,
`workflow.execute_activity_method()`, `workflow.start_local_activity_method()`, and
`workflow.execute_local_activity_method()` instead.
* Activities can callable classes (i.e. that define `__call__`). Invokers should use `workflow.start_activity_class()`,
`workflow.execute_activity_class()`, `workflow.start_local_activity_class()`, and
`workflow.execute_local_activity_class()` instead.
#### Invoking Child Workflows
* Child workflows are started with async `workflow.start_child_workflow()` which accepts either a workflow run method
reference or a string name. The arguments to the workflow are positional.
* A single argument to the child workflow is positional. Multiple arguments are not supported in the type-safe form of
start/execute child workflow and must be supplied via the `args` keyword argument.
* Child workflow options are set as keyword arguments after the arguments. At least `id` must be provided.
* The `await` of the start does not complete until the start has been accepted by the server
* The result is a child workflow handle which is an `asyncio.Task` and supports basic task features. The handle also has
some child info and supports signalling the child workflow
* An async `workflow.execute_child_workflow()` helper is provided which takes the same arguments as
`workflow.start_child_workflow()` and `await`s on the result. This should be used in most cases unless advanced task
capabilities are needed.
#### Timers
* A timer is represented by normal `asyncio.sleep()` or a `workflow.sleep()` call
* Timers are also implicitly started on any `asyncio` calls with timeouts (e.g. `asyncio.wait_for`)
* Timers are Temporal server timers, not local ones, so sub-second resolution rarely has value
* Calls that use a specific point in time, e.g. `call_at` or `timeout_at`, should be based on the current loop time
(i.e. `workflow.time()`) and not an actual point in time. This is because fixed times are translated to relative ones
by subtracting the current loop time which may not be the actual current time.
#### Conditions
* `workflow.wait_condition` is an async function that doesn't return until a provided callback returns true
* A `timeout` can optionally be provided which will throw a `asyncio.TimeoutError` if reached (internally backed by
`asyncio.wait_for` which uses a timer)
#### Asyncio and Determinism
Workflows must be deterministic. Workflows are backed by a custom
[asyncio](https://docs.python.org/3/library/asyncio.html) event loop. This means many of the common `asyncio` calls work
as normal. Some asyncio features are disabled such as:
* Thread related calls such as `to_thread()`, `run_coroutine_threadsafe()`, `loop.run_in_executor()`, etc
* Calls that alter the event loop such as `loop.close()`, `loop.stop()`, `loop.run_forever()`,
`loop.set_task_factory()`, etc
* Calls that use anything external such as networking, subprocesses, disk IO, etc
Also, there are some `asyncio` utilities that internally use `set()` which can make them non-deterministic from one
worker to the next. Therefore the following `asyncio` functions have `workflow`-module alternatives that are
deterministic:
* `asyncio.as_completed()` - use `workflow.as_completed()`
* `asyncio.wait()` - use `workflow.wait()`
#### Asyncio Cancellation
Cancellation is done using `asyncio` [task cancellation](https://docs.python.org/3/library/asyncio-task.html#task-cancellation).
This means that tasks are requested to be cancelled but can catch the
[`asyncio.CancelledError`](https://docs.python.org/3/library/asyncio-exceptions.html#asyncio.CancelledError), thus
allowing them to perform some cleanup before allowing the cancellation to proceed (i.e. re-raising the error), or to
deny the cancellation entirely. It also means that
[`asyncio.shield()`](https://docs.python.org/3/library/asyncio-task.html#shielding-from-cancellation) can be used to
protect tasks against cancellation.
The following tasks, when cancelled, perform a Temporal cancellation:
* Activities - when the task executing an activity is cancelled, a cancellation request is sent to the activity
* Child workflows - when the task starting or executing a child workflow is cancelled, a cancellation request is sent to
cancel the child workflow
* Timers - when the task executing a timer is cancelled (whether started via sleep or timeout), the timer is cancelled
When the workflow itself is requested to cancel, `Task.cancel` is called on the main workflow task. Therefore,
`asyncio.CancelledError` can be caught in order to handle the cancel gracefully.
Workflows follow `asyncio` cancellation rules exactly which can cause confusion among Python developers. Cancelling a
task doesn't always cancel the thing it created. For example, given
`task = asyncio.create_task(workflow.start_child_workflow(...`, calling `task.cancel` does not cancel the child
workflow, it only cancels the starting of it, which has no effect if it has already started. However, cancelling the
result of `handle = await workflow.start_child_workflow(...` or
`task = asyncio.create_task(workflow.execute_child_workflow(...` _does_ cancel the child workflow.
Also, due to Temporal rules, a cancellation request is a state not an event. Therefore, repeated cancellation requests
are not delivered, only the first. If the workflow chooses swallow a cancellation, it cannot be requested again.
#### Workflow Utilities
While running in a workflow, in addition to features documented elsewhere, the following items are available from the
`temporalio.workflow` package:
* `continue_as_new()` - Async function to stop the workflow immediately and continue as new
* `info()` - Returns information about the current workflow
* `logger` - A logger for use in a workflow (properly skips logging on replay)
* `now()` - Returns the "current time" from the workflow's perspective
#### Exceptions
* Workflows/updates can raise exceptions to fail the workflow or the "workflow task" (i.e. suspend the workflow
in a retrying state).
* Exceptions that are instances of `temporalio.exceptions.FailureError` will fail the workflow with that exception
* For failing the workflow explicitly with a user exception, use `temporalio.exceptions.ApplicationError`. This can
be marked non-retryable or include details as needed.
* Other exceptions that come from activity execution, child execution, cancellation, etc are already instances of
`FailureError` and will fail the workflow when uncaught.
* Update handlers are special: an instance of `temporalio.exceptions.FailureError` raised in an update handler will fail
the update instead of failing the workflow.
* All other exceptions fail the "workflow task" which means the workflow will continually retry until the workflow is
fixed. This is helpful for bad code or other non-predictable exceptions. To actually fail the workflow, use an
`ApplicationError` as mentioned above.
This default can be changed by providing a list of exception types to `workflow_failure_exception_types` when creating a
`Worker` or `failure_exception_types` on the `@workflow.defn` decorator. If a workflow-thrown exception is an instance
of any type in either list, it will fail the workflow (or update) instead of the workflow task. This means a value of
`[Exception]` will cause every exception to fail the workflow instead of the workflow task. Also, as a special case, if
`temporalio.workflow.NondeterminismError` (or any superclass of it) is set, non-deterministic exceptions will fail the
workflow. WARNING: These settings are experimental.
#### Signal and update handlers
Signal and update handlers are defined using decorated methods as shown in the example [above](#definition). Client code
sends signals and updates using `workflow_handle.signal`, `workflow_handle.execute_update`, or
`workflow_handle.start_update`. When the workflow receives one of these requests, it starts an `asyncio.Task` executing
the corresponding handler method with the argument(s) from the request.
The handler methods may be `async def` and can do all the async operations described above (e.g. invoking activities and
child workflows, and waiting on timers and conditions). Notice that this means that handler tasks will be executing
concurrently with respect to each other and the main workflow task. Use
[asyncio.Lock](https://docs.python.org/3/library/asyncio-sync.html#lock) and
[asyncio.Semaphore](https://docs.python.org/3/library/asyncio-sync.html#semaphore) if necessary.
Your main workflow task may finish as a result of successful completion, cancellation, continue-as-new, or failure. You
should ensure that all in-progress signal and update handler tasks have finished before this happens; if you do not, you
will see a warning (the warning can be disabled via the `workflow.signal`/`workflow.update` decorators). One way to
ensure that handler tasks have finished is to wait on the `workflow.all_handlers_finished` condition:
```python
await workflow.wait_condition(workflow.all_handlers_finished)
```
#### External Workflows
* `workflow.get_external_workflow_handle()` inside a workflow returns a handle to interact with another workflow
* `workflow.get_external_workflow_handle_for()` can be used instead for a type safe handle
* `await handle.signal()` can be called on the handle to signal the external workflow
* `await handle.cancel()` can be called on the handle to send a cancel to the external workflow
#### Testing
Workflow testing can be done in an integration-test fashion against a real server, however it is hard to simulate
timeouts and other long time-based code. Using the time-skipping workflow test environment can help there.
The time-skipping `temporalio.testing.WorkflowEnvironment` can be created via the static async `start_time_skipping()`.
This internally downloads the Temporal time-skipping test server to a temporary directory if it doesn't already exist,
then starts the test server which has special APIs for skipping time.
**NOTE:** The time-skipping test environment does not work on ARM. The SDK will try to download the x64 binary on macOS
for use with the Intel emulator, but for Linux or Windows ARM there is no proper time-skipping test server at this time.
##### Automatic Time Skipping
Anytime a workflow result is waited on, the time-skipping server automatically advances to the next event it can. To
manually advance time before waiting on the result of a workflow, the `WorkflowEnvironment.sleep` method can be used.
Here's a simple example of a workflow that sleeps for 24 hours:
```python
import asyncio
from temporalio import workflow
@workflow.defn
class WaitADayWorkflow:
@workflow.run
async def run(self) -> str:
await asyncio.sleep(24 * 60 * 60)
return "all done"
```
An integration test of this workflow would be way too slow. However the time-skipping server automatically skips to the
next event when we wait on the result. Here's a test for that workflow:
```python
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
async def test_wait_a_day_workflow():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(env.client, task_queue="tq1", workflows=[WaitADayWorkflow]):
assert "all done" == await env.client.execute_workflow(WaitADayWorkflow.run, id="wf1", task_queue="tq1")
```
That test will run almost instantly. This is because by calling `execute_workflow` on our client, we have asked the
environment to automatically skip time as much as it can (basically until the end of the workflow or until an activity
is run).
To disable automatic time-skipping while waiting for a workflow result, run code inside a
`with env.auto_time_skipping_disabled():` block.
##### Manual Time Skipping
Until a workflow is waited on, all time skipping in the time-skipping environment is done manually via
`WorkflowEnvironment.sleep`.
Here's workflow that waits for a signal or times out:
```python
import asyncio
from temporalio import workflow
@workflow.defn
class SignalWorkflow:
def __init__(self) -> None:
self.signal_received = False
@workflow.run
async def run(self) -> str:
# Wait for signal or timeout in 45 seconds
try:
await workflow.wait_condition(lambda: self.signal_received, timeout=45)
return "got signal"
except asyncio.TimeoutError:
return "got timeout"
@workflow.signal
def some_signal(self) -> None:
self.signal_received = True
```
To test a normal signal, you might:
```python
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
async def test_signal_workflow():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(env.client, task_queue="tq1", workflows=[SignalWorkflow]):
# Start workflow, send signal, check result
handle = await env.client.start_workflow(SignalWorkflow.run, id="wf1", task_queue="tq1")
await handle.signal(SignalWorkflow.some_signal)
assert "got signal" == await handle.result()
```
But how would you test the timeout part? Like so:
```python
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
async def test_signal_workflow_timeout():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(env.client, task_queue="tq1", workflows=[SignalWorkflow]):
# Start workflow, advance time past timeout, check result
handle = await env.client.start_workflow(SignalWorkflow.run, id="wf1", task_queue="tq1")
await env.sleep(50)
assert "got timeout" == await handle.result()
```
Also, the current time of the workflow environment can be obtained via the async `WorkflowEnvironment.get_current_time`
method.
##### Mocking Activities
Activities are just functions decorated with `@activity.defn`. Simply write different ones and pass those to the worker
to have different activities called during the test.
#### Workflow Sandbox
By default workflows are run in a sandbox to help avoid non-deterministic code. If a call that is known to be
non-deterministic is performed, an exception will be thrown in the workflow which will "fail the task" which means the
workflow will not progress until fixed.
The sandbox is not foolproof and non-determinism can still occur. It is simply a best-effort way to catch bad code
early. Users are encouraged to define their workflows in files with no other side effects.
The sandbox offers a mechanism to "pass through" modules from outside the sandbox. By default this already includes all
standard library modules and Temporal modules. **For performance and behavior reasons, users are encouraged to pass
through all modules whose calls will be deterministic.** In particular, this advice extends to modules containing the
activities to be referenced in workflows, and modules containing dataclasses and Pydantic models, which can be
particularly expensive to import. See "Passthrough Modules" below on how to do this.
##### How the Sandbox Works
The sandbox is made up of two components that work closely together:
* Global state isolation
* Restrictions preventing known non-deterministic library calls
Global state isolation is performed by using `exec`. Upon workflow start, and every time that the workflow is replayed,
the file that the workflow is defined in is re-imported into a new sandbox created for that workflow run. In order to
keep the sandbox performant, not all modules are re-imported in this way: instead, a known set of "passthrough modules"
are obtained as references to the already-imported module _outside_ the sandbox. These modules should be side-effect
free on import and, if they make any non-deterministic calls, then these should be restricted by sandbox restriction
rules. By default the entire Python standard library, `temporalio`, and a couple of other modules are "passed through"
in this way from outside of the sandbox. To update this list, see "Customizing the Sandbox".
Restrictions preventing known non-deterministic library calls are achieved using proxy objects on modules wrapped around
the custom importer set in the sandbox. Many restrictions apply at workflow import time and workflow run time, while
some restrictions only apply at workflow run time. A default set of restrictions is included that prevents most
dangerous standard library calls. However it is known in Python that some otherwise-non-deterministic invocations, like
reading a file from disk via `open` or using `os.environ`, are done as part of importing modules. To customize what is
and isn't restricted, see "Customizing the Sandbox".
##### Avoiding the Sandbox
There are three increasingly-scoped ways to avoid the sandbox. Users are discouraged from avoiding the sandbox if
possible, except for passing through safe modules, which is recommended.
To remove restrictions around a particular block of code, use `with temporalio.workflow.unsafe.sandbox_unrestricted():`.
The workflow will still be running in the sandbox, but no restrictions for invalid library calls will be applied.
To run an entire workflow outside of a sandbox, set `sandboxed=False` on the `@workflow.defn` decorator when defining
it. This will run the entire workflow outside of the sandbox which means it can share global state and other bad
things.
To disable the sandbox entirely for a worker, set the `Worker` init's `workflow_runner` keyword argument to
`temporalio.worker.UnsandboxedWorkflowRunner()`. This value is defaulted to
`temporalio.worker.workflow_sandbox.SandboxedWorkflowRunner()` so by changing it to the unsandboxed runner, the sandbox
will not be used at all.
##### Customizing the Sandbox
⚠️ WARNING: APIs in the `temporalio.worker.workflow_sandbox` module are not yet considered stable and may change in
future releases.
When creating the `Worker`, the `workflow_runner` is defaulted to
`temporalio.worker.workflow_sandbox.SandboxedWorkflowRunner()`. The `SandboxedWorkflowRunner`'s init accepts a
`restrictions` keyword argument that is defaulted to `SandboxRestrictions.default`. The `SandboxRestrictions` dataclass
is immutable and contains three fields that can be customized, but only two have notable value. See below.
###### Passthrough Modules
By default the sandbox completely reloads non-standard-library and non-Temporal modules for every workflow run. To make
the sandbox quicker and use less memory when importing known-side-effect-free modules, they can be marked
as passthrough modules.
**For performance and behavior reasons, users are encouraged to pass through all third party modules whose calls will be
deterministic.** In particular, this advice extends to modules containing the activities to be referenced in workflows,
and modules containing dataclasses and Pydantic models, which can be particularly expensive to import.
One way to pass through a module is at import time in the workflow file using the `imports_passed_through` context
manager like so:
```python
# my_workflow_file.py
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pydantic
@workflow.defn
class MyWorkflow:
...
```
Alternatively, this can be done at worker creation time by customizing the runner's restrictions. For example:
```python
my_worker = Worker(
...,
workflow_runner=SandboxedWorkflowRunner(
restrictions=SandboxRestrictions.default.with_passthrough_modules("pydantic")
)
)
```
In both of these cases, now the `pydantic` module will be passed through from outside of the sandbox instead of
being reloaded for every workflow run.
If users are sure that no imports they use in workflow files will ever need to be sandboxed (meaning all calls within
are deterministic and never mutate shared, global state), the `passthrough_all_modules` option can be set on the
restrictions or the `with_passthrough_all_modules` helper can by used, for example:
```python
my_worker = Worker(
...,
workflow_runner=SandboxedWorkflowRunner(
restrictions=SandboxRestrictions.default.with_passthrough_all_modules()
)
)
```
Note, some calls from the module may still be checked for invalid calls at runtime for certain builtins.
###### Invalid Module Members
`SandboxRestrictions.invalid_module_members` contains a root matcher that applies to all module members. This already
has a default set which includes things like `datetime.date.today()` which should never be called from a workflow. To
remove this restriction:
```python
my_restrictions = dataclasses.replace(
SandboxRestrictions.default,
invalid_module_members=SandboxRestrictions.invalid_module_members_default.with_child_unrestricted(
"datetime", "date", "today",
),
)
my_worker = Worker(..., workflow_runner=SandboxedWorkflowRunner(restrictions=my_restrictions))
```
Restrictions can also be added by `|`'ing together matchers, for example to restrict the `datetime.date` class from
being used altogether:
```python
my_restrictions = dataclasses.replace(
SandboxRestrictions.default,
invalid_module_members=SandboxRestrictions.invalid_module_members_default | SandboxMatcher(
children={"datetime": SandboxMatcher(use={"date"})},
),
)
my_worker = Worker(..., workflow_runner=SandboxedWorkflowRunner(restrictions=my_restrictions))
```
See the API for more details on exact fields and their meaning.
##### Known Sandbox Issues
Below are known sandbox issues. As the sandbox is developed and matures, some may be resolved.
###### Global Import/Builtins
Currently the sandbox references/alters the global `sys.modules` and `builtins` fields while running workflow code. In
order to prevent affecting other sandboxed code, thread locals are leveraged to only intercept these values during the
workflow thread running. Therefore, technically if top-level import code starts a thread, it may lose sandbox
protection.
###### Sandbox is not Secure
The sandbox is built to catch many non-deterministic and state sharing issues, but it is not secure. Some known bad
calls are intercepted, but for performance reasons, every single attribute get/set cannot be checked. Therefore a simple
call like `setattr(temporalio.common, "__my_key", "my value")` will leak across sandbox runs.
The sandbox is only a helper, it does not provide full protection.
###### Sandbox Performance
The sandbox does not add significant CPU or memory overhead for workflows that are in files which only import standard
library modules. This is because they are passed through from outside of the sandbox. However, every
non-standard-library import that is performed at the top of the same file the workflow is in will add CPU overhead (the
module is re-imported every workflow run) and memory overhead (each module independently cached as part of the workflow
run for isolation reasons). This becomes more apparent for large numbers of workflow runs.
To mitigate this, users should:
* Define workflows in files that have as few non-standard-library imports as possible
* Alter the max workflow cache and/or max concurrent workflows settings if memory grows too large
* Set third-party libraries as passthrough modules if they are known to be side-effect free
###### Extending Restricted Classes
Extending a restricted class causes Python to instantiate the restricted metaclass which is unsupported. Therefore if
you attempt to use a class in the sandbox that extends a restricted class, it will fail. For example, if you have a
`class MyZipFile(zipfile.ZipFile)` and try to use that class inside a workflow, it will fail.
Classes used inside the workflow should not extend restricted classes. For situations where third-party modules need to
at import time, they should be marked as pass through modules.
###### Certain Standard Library Calls on Restricted Objects
If an object is restricted, internal C Python validation may fail in some cases. For example, running
`dict.items(os.__dict__)` will fail with:
> descriptor 'items' for 'dict' objects doesn't apply to a '_RestrictedProxy' object
This is a low-level check that cannot be subverted. The solution is to not use restricted objects inside the sandbox.
For situations where third-party modules need to at import time, they should be marked as pass through modules.
###### is_subclass of ABC-based Restricted Classes
Due to [https://bugs.python.org/issue44847](https://bugs.python.org/issue44847), classes that are wrapped and then
checked to see if they are subclasses of another via `is_subclass` may fail (see also
[this wrapt issue](https://github.com/GrahamDumpleton/wrapt/issues/130)).
### Activities
#### Definition
Activities are decorated with `@activity.defn` like so:
```python
from temporalio import activity
@activity.defn
def say_hello_activity(name: str) -> str:
return f"Hello, {name}!"
```
Some things to note about activity definitions:
* The `say_hello_activity` is synchronous which is the recommended activity type (see "Types of Activities" below), but
it can be `async`
* A custom name for the activity can be set with a decorator argument, e.g. `@activity.defn(name="my activity")`
* Long running activities should regularly heartbeat and handle cancellation
* Activities can only have positional arguments. Best practice is to only take a single argument that is an
object/dataclass of fields that can be added to as needed.
* Activities can be defined on methods instead of top-level functions. This allows the instance to carry state that an
activity may need (e.g. a DB connection). The instance method should be what is registered with the worker.
* Activities can also be defined on callable classes (i.e. classes with `__call__`). An instance of the class should be
what is registered with the worker.
* The `@activity.defn` can have `dynamic=True` set which means all otherwise unhandled activities fall through to this.
If present, cannot have `name` argument, and the activity function must accept a single parameter of
`Sequence[temporalio.common.RawValue]`. The payload of the raw value can be converted via
`activity.payload_converter().from_payload`.
#### Types of Activities
There are 3 types of activity callables accepted and described below: synchronous multithreaded, synchronous
multiprocess/other, and asynchronous. Only positional parameters are allowed in activity callables.
##### Synchronous Activities
Synchronous activities, i.e. functions that do not have `async def`, can be used with workers, but the
`activity_executor` worker parameter must be set with a `concurrent.futures.Executor` instance to use for executing the
activities.
All long running, non-local activities should heartbeat so they can be cancelled. Cancellation in threaded activities
throws but multiprocess/other activities does not. The sections below on each synchronous type explain further. There
are also calls on the context that can check for cancellation. For more information, see "Activity Context" and
"Heartbeating and Cancellation" sections later.
Note, all calls from an activity to functions in the `temporalio.activity` package are powered by
[contextvars](https://docs.python.org/3/library/contextvars.html). Therefore, new threads starting _inside_ of
activities must `copy_context()` and then `.run()` manually to ensure `temporalio.activity` calls like `heartbeat` still
function in the new threads.
If any activity ever throws a `concurrent.futures.BrokenExecutor`, the failure is consisted unrecoverable and the worker
will fail and shutdown.
###### Synchronous Multithreaded Activities
If `activity_executor` is set to an instance of `concurrent.futures.ThreadPoolExecutor` then the synchronous activities
are considered multithreaded activities. If `max_workers` is not set to at least the worker's
`max_concurrent_activities` setting a warning will be issued. Besides `activity_executor`, no other worker parameters
are required for synchronous multithreaded activities.
By default, cancellation of a synchronous multithreaded activity is done via a `temporalio.exceptions.CancelledError`
thrown into the activity thread. Activities that do not wish to have cancellation thrown can set
`no_thread_cancel_exception=True` in the `@activity.defn` decorator.
Code that wishes to be temporarily shielded from the cancellation exception can run inside
`with activity.shield_thread_cancel_exception():`. But once the last nested form of that block is finished, even if
there is a return statement within, it will throw the cancellation if there was one. A `try` +
`except temporalio.exceptions.CancelledError` would have to surround the `with` to handle the cancellation explicitly.
###### Synchronous Multiprocess/Other Activities
If `activity_executor` is set to an instance of `concurrent.futures.Executor` that is _not_
`concurrent.futures.ThreadPoolExecutor`, then the synchronous activities are considered multiprocess/other activities.
Users should prefer threaded activities over multiprocess ones since, among other reasons, threaded activities can raise
on cancellation.
These require special primitives for heartbeating and cancellation. The `shared_state_manager` worker parameter must be
set to an instance of `temporalio.worker.SharedStateManager`. The most common implementation can be created by passing a
`multiprocessing.managers.SyncManager` (i.e. result of `multiprocessing.managers.Manager()`) to
`temporalio.worker.SharedStateManager.create_from_multiprocessing()`.
Also, all of these activity functions must be
["picklable"](https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled).
##### Asynchronous Activities
Asynchronous activities are functions defined with `async def`. Asynchronous activities are often much more performant
than synchronous ones. When using asynchronous activities no special worker parameters are needed.
**⚠️ WARNING: Do not block the thread in `async def` Python functions. This can stop the processing of the rest of the
Temporal.**
Cancellation for asynchronous activities is done via
[`asyncio.Task.cancel`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel). This means that
`asyncio.CancelledError` will be raised (and can be caught, but it is not recommended). A non-local activity must
heartbeat to receive cancellation and there are other ways to be notified about cancellation (see "Activity Context" and
"Heartbeating and Cancellation" later).
#### Activity Context
During activity execution, an implicit activity context is set as a
[context variable](https://docs.python.org/3/library/contextvars.html). The context variable itself is not visible, but
calls in the `temporalio.activity` package make use of it. Specifically:
* `in_activity()` - Whether an activity context is present
* `info()` - Returns the immutable info of the currently running activity
* `client()` - Returns the Temporal client used by this worker. Only available in `async def` activities.
* `heartbeat(*details)` - Record a heartbeat
* `is_cancelled()` - Whether a cancellation has been requested on this activity
* `wait_for_cancelled()` - `async` call to wait for cancellation request
* `wait_for_cancelled_sync(timeout)` - Synchronous blocking call to wait for cancellation request
* `shield_thread_cancel_exception()` - Context manager for use in `with` clauses by synchronous multithreaded activities
to prevent cancel exception from being thrown during the block of code
* `is_worker_shutdown()` - Whether the worker has started graceful shutdown
* `wait_for_worker_shutdown()` - `async` call to wait for start of graceful worker shutdown
* `wait_for_worker_shutdown_sync(timeout)` - Synchronous blocking call to wait for start of graceful worker shutdown
* `raise_complete_async()` - Raise an error that this activity will be completed asynchronously (i.e. after return of
the activity function in a separate client call)
With the exception of `in_activity()`, if any of the functions are called outside of an activity context, an error
occurs. Synchronous activities cannot call any of the `async` functions.
##### Heartbeating and Cancellation
In order for a non-local activity to be notified of cancellation requests, it must be given a `heartbeat_timeout` at
invocation time and invoke `temporalio.activity.heartbeat()` inside the activity. It is strongly recommended that all
but the fastest executing activities call this function regularly. "Types of Activities" has specifics on cancellation
for synchronous and asynchronous activities.
In addition to obtaining cancellation information, heartbeats also support detail data that is persisted on the server
for retrieval during activity retry. If an activity calls `temporalio.activity.heartbeat(123, 456)` and then fails and
is retried, `temporalio.activity.info().heartbeat_details` will return an iterable containing `123` and `456` on the
next run.
Heartbeating has no effect on local activities.
##### Worker Shutdown
An activity can react to a worker shutdown. Using `is_worker_shutdown` or one of the `wait_for_worker_shutdown`
functions an activity can react to a shutdown.
When the `graceful_shutdown_timeout` worker parameter is given a `datetime.timedelta`, on shutdown the worker will
notify activities of the graceful shutdown. Once that timeout has passed (or if wasn't set), the worker will perform
cancellation of all outstanding activities.
The `shutdown()` invocation will wait on all activities to complete, so if a long-running activity does not at least
respect cancellation, the shutdown may never complete.
#### Testing
Unit testing an activity or any code that could run in an activity is done via the
`temporalio.testing.ActivityEnvironment` class. Simply instantiate this and any callable + params passed to `run` will
be invoked inside the activity context. The following are attributes/methods on the environment that can be used to
affect calls activity code might make to functions on the `temporalio.activity` package.
* `info` property can be set to customize what is returned from `activity.info()`
* `on_heartbeat` property can be set to handle `activity.heartbeat()` calls
* `cancel()` can be invoked to simulate a cancellation of the activity
* `worker_shutdown()` can be invoked to simulate a worker shutdown during execution of the activity
### Interceptors
The behavior of the SDK can be customized in many useful ways by modifying inbound and outbound calls using
interceptors. This is similar to the use of middleware in other frameworks.
There are five categories of inbound and outbound calls that you can modify in this way:
1. Outbound client calls, such as `start_workflow()`, `signal_workflow()`, `list_workflows()`, `update_schedule()`, etc.
2. Inbound workflow calls: `execute_workflow()`, `handle_signal()`, `handle_update_handler()`, etc
3. Outbound workflow calls: `start_activity()`, `start_child_workflow()`, `start_nexus_operation()`, etc
4. Inbound call to execute an activity: `execute_activity()`
5. Outbound activity calls: `info()` and `heartbeat()`
To modify outbound client calls, define a class inheriting from
[`client.Interceptor`](https://python.temporal.io/temporalio.client.Interceptor.html), and implement the method
`intercept_client()` to return an instance of
[`OutboundInterceptor`](https://python.temporal.io/temporalio.client.OutboundInterceptor.html) that implements the
subset of outbound client calls that you wish to modify.
Then, pass a list containing an instance of your `client.Interceptor` class as the
`interceptors` argument of [`Client.connect()`](https://python.temporal.io/temporalio.client.Client.html#connect).
The purpose of the interceptor framework is that the methods you implement on your interceptor classes can perform
arbitrary side effects and/or arbitrary modifications to the data, before it is received by the SDK's "real"
implementation. The `interceptors` list can contain multiple interceptors. In this case they form a chain: a method
implemented on an interceptor instance in the list can perform side effects, and modify the data, before passing it on
to the corresponding method on the next interceptor in the list. Your interceptor classes need not implement every
method; the default implementation is always to pass the data on to the next method in the interceptor chain.
The remaining four categories are worker calls. To modify these, define a class inheriting from
[`worker.Interceptor`](https://python.temporal.io/temporalio.worker.Interceptor.html) and implement methods on that
class to define the
[`ActivityInboundInterceptor`](https://python.temporal.io/temporalio.worker.ActivityInboundInterceptor.html),
[`ActivityOutboundInterceptor`](https://python.temporal.io/temporalio.worker.ActivityOutboundInterceptor.html),
[`WorkflowInboundInterceptor`](https://python.temporal.io/temporalio.worker.WorkflowInboundInterceptor.html), and
[`WorkflowOutboundInterceptor`](https://python.temporal.io/temporalio.worker.WorkflowOutboundInterceptor.html) classes
that you wish to use to effect your modifications. Then, pass a list containing an instance of your `worker.Interceptor`
class as the `interceptors` argument of the [`Worker()`](https://python.temporal.io/temporalio.worker.Worker.html)
constructor.
It often happens that your worker and client interceptors will share code because they implement closely related logic.
For convenience, you can create an interceptor class that inherits from _both_ `client.Interceptor` and
`worker.Interceptor` (their method sets do not overlap). You can then pass this in the `interceptors` argument of
`Client.connect()` when starting your worker _as well as_ in your client/starter code. If you do this, your worker will
automatically pick up the interceptors from its underlying client (and you should not pass them directly to the
`Worker()` constructor).
This is best explained by example. The [Context Propagation Interceptor
Sample](https://github.com/temporalio/samples-python/tree/main/context_propagation) is a good starting point. In
[context_propagation/interceptor.py](https://github.com/temporalio/samples-python/blob/main/context_propagation/interceptor.py)
a class is defined that inherits from both `client.Interceptor` and `worker.Interceptor`. It implements the various
methods such that the outbound client and workflow calls set a certain key in the outbound `headers` field, and the
inbound workflow and activity calls retrieve the header value from the inbound workflow/activity input data. An instance
of this interceptor class is passed to `Client.connect()` when [starting the
worker](https://github.com/temporalio/samples-python/blob/main/context_propagation/worker.py) and when connecting the
client in the [workflow starter
code](https://github.com/temporalio/samples-python/blob/main/context_propagation/starter.py).
### Nexus
⚠️ **Nexus support is currently at an experimental release stage. Backwards-incompatible changes are anticipated until a stable release is announced.** ⚠️
[Nexus](https://github.com/nexus-rpc/) is a synchronous RPC protocol. Arbitrary duration operations that can respond
asynchronously are modeled on top of a set of pre-defined synchronous RPCs.
Temporal supports calling Nexus operations **from a workflow**. See https://docs.temporal.io/nexus. There is no support
currently for calling a Nexus operation from non-workflow code.
To get started quickly using Nexus with Temporal, see the Python Nexus sample:
https://github.com/temporalio/samples-python/tree/nexus/hello_nexus.
Two types of Nexus operation are supported, each using a decorator:
- `@temporalio.nexus.workflow_run_operation`: a Nexus operation that is backed by a Temporal workflow. The operation
handler you write will start the handler workflow and then respond with a token indicating that the handler workflow
is in progress. When the handler workflow completes, Temporal server will automatically deliver the result (success or
failure) to the caller workflow.
- `@nexusrpc.handler.sync_operation`: an operation that responds synchronously. It may be `def` or `async def` and it
may do network I/O, but it must respond within 10 seconds.
The following steps are an overview of the [Python Nexus sample](
https://github.com/temporalio/samples-python/tree/nexus/hello_nexus).
1. Create the caller and handler namespaces, and the Nexus endpoint. For example,
```
temporal operator namespace create --namespace my-handler-namespace
temporal operator namespace create --namespace my-caller-namespace
temporal operator nexus endpoint create \
--name my-nexus-endpoint \
--target-namespace my-handler-namespace \
--target-task-queue my-handler-task-queue
```
2. Define your service contract. This specifies the names and input/output types of your operations. You will use this
to refer to the operations when calling them from a workflow.
```python
@nexusrpc.service
class MyNexusService:
my_sync_operation: nexusrpc.Operation[MyInput, MyOutput]
my_workflow_run_operation: nexusrpc.Operation[MyInput, MyOutput]
```
3. Implement your operation handlers in a service handler:
```python
@service_handler(service=MyNexusService)
class MyNexusServiceHandler:
@sync_operation
async def my_sync_operation(
self, ctx: StartOperationContext, input: MyInput
) -> MyOutput:
return MyOutput(message=f"Hello {input.name} from sync operation!")
@workflow_run_operation
async def my_workflow_run_operation(
self, ctx: WorkflowRunOperationContext, input: MyInput
) -> nexus.WorkflowHandle[MyOutput]:
return await ctx.start_workflow(
WorkflowStartedByNexusOperation.run,
input,
id=str(uuid.uuid4()),
)
```
4. Register your service handler with a Temporal worker.
```python
client = await Client.connect("localhost:7233", namespace="my-handler-namespace")
worker = Worker(
client,
task_queue="my-handler-task-queue",
workflows=[WorkflowStartedByNexusOperation],
nexus_service_handlers=[MyNexusServiceHandler()],
)
await worker.run()
```
5. Call your Nexus operations from your caller workflow.
```python
@workflow.defn
class CallerWorkflow:
def __init__(self):
self.nexus_client = workflow.create_nexus_client(
service=MyNexusService, endpoint="my-nexus-endpoint"
)
@workflow.run
async def run(self, name: str) -> tuple[MyOutput, MyOutput]:
# Start the Nexus operation and wait for the result in one go, using execute_operation.
wf_result = await self.nexus_client.execute_operation(
MyNexusService.my_workflow_run_operation,
MyInput(name),
)
# Or alternatively, obtain the operation handle using start_operation,
# and then use it to get the result:
sync_operation_handle = await self.nexus_client.start_operation(
MyNexusService.my_sync_operation,
MyInput(name),
)
sync_result = await sync_operation_handle
return sync_result, wf_result
```
### Plugins
Plugins provide a way to extend and customize the behavior of Temporal clients and workers through a chain of
responsibility pattern. They allow you to intercept and modify client creation, service connections, worker
configuration, and worker execution. Common customizations may include but are not limited to:
1. DataConverter
2. Activities
3. Workflows
4. Interceptors
A single plugin class can implement both client and worker plugin interfaces to share common logic between both
contexts. When used with a client, it will automatically be propagated to any workers created with that client.
#### Client Plugins
Client plugins can intercept and modify client configuration and service connections. They are useful for adding
authentication, modifying connection parameters, or adding custom behavior during client creation.
Here's an example of a client plugin that adds custom authentication:
```python
from temporalio.client import Plugin, ClientConfig
import temporalio.service
class AuthenticationPlugin(Plugin):
def __init__(self, api_key: str):
self.api_key = api_key
def configure_client(self, config: ClientConfig) -> ClientConfig:
# Modify client configuration
config["namespace"] = "my-secure-namespace"
return super().configure_client(config)
async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await super().connect_service_client(config)
# Use the plugin when connecting
client = await Client.connect(
"my-server.com:7233",
plugins=[AuthenticationPlugin("my-api-key")]
)
```
#### Worker Plugins
Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring,
custom lifecycle management, or modifying worker settings.
Here's an example of a worker plugin that adds custom monitoring:
```python
from temporalio.worker import Plugin, WorkerConfig, Worker
import logging
class MonitoringPlugin(Plugin):
def __init__(self):
self.logger = logging.getLogger(__name__)
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Modify worker configuration
original_task_queue = config["task_queue"]
config["task_queue"] = f"monitored-{original_task_queue}"
self.logger.info(f"Worker created for task queue: {config['task_queue']}")
return super().configure_worker(config)
async def run_worker(self, worker: Worker) -> None:
self.logger.info("Starting worker execution")
try:
await super().run_worker(worker)
finally:
self.logger.info("Worker execution completed")
# Use the plugin when creating a worker
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
plugins=[MonitoringPlugin()]
)
```
For plugins that need to work with both clients and workers, you can implement both interfaces in a single class:
```python
from temporalio.client import Plugin as ClientPlugin, ClientConfig
from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig
class UnifiedPlugin(ClientPlugin, WorkerPlugin):
def configure_client(self, config: ClientConfig) -> ClientConfig:
# Client-side customization
config["namespace"] = "unified-namespace"
return super().configure_client(config)
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Worker-side customization
config["max_cached_workflows"] = 500
return super().configure_worker(config)
async def run_worker(self, worker: Worker) -> None:
print("Starting unified worker")
await super().run_worker(worker)
# Create client with the unified plugin
client = await Client.connect(
"localhost:7233",
plugins=[UnifiedPlugin()]
)
# Worker will automatically inherit the plugin from the client
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity]
)
```
**Important Notes:**
- Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility
- Client plugins that also implement worker plugin interfaces are automatically propagated to workers
- Avoid providing the same plugin to both client and worker to prevent double execution
- Plugin methods should call `super()` to maintain the plugin chain
- Each plugin's `name()` method returns a unique identifier for debugging purposes
### Workflow Replay
Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,
assuming `history_str` is populated with a JSON string history either exported from the web UI or from `tctl`, the
following function will replay it:
```python
from temporalio.client import WorkflowHistory
from temporalio.worker import Replayer
async def run_replayer(history_str: str):
replayer = Replayer(workflows=[SayHello])
await replayer.replay_workflow(WorkflowHistory.from_json(history_str))
```
This will throw an error if any non-determinism is detected.
Replaying from workflow history is a powerful concept that many use to test that workflow alterations won't cause
non-determinisms with past-complete workflows. The following code will make sure that all workflow histories for a
certain workflow type (i.e. workflow class) are safe with the current code.
```python
from temporalio.client import Client, WorkflowHistory
from temporalio.worker import Replayer
async def check_past_histories(my_client: Client):
replayer = Replayer(workflows=[SayHello])
await replayer.replay_workflows(
await my_client.list_workflows("WorkflowType = 'SayHello'").map_histories(),
)
```
### Observability
See https://github.com/temporalio/samples-python/tree/main/open_telemetry for a sample demonstrating collection of
metrics and tracing data emitted by the SDK.
#### Metrics
The SDK emits various metrics by default: see https://docs.temporal.io/references/sdk-metrics. To configure additional
attributes to be emitted with all metrics, pass
[global_tags](https://python.temporal.io/temporalio.runtime.TelemetryConfig.html#global_tags) when creating the
[TelemetryConfig](https://python.temporal.io/temporalio.runtime.TelemetryConfig.html).
For emitting custom metrics, the SDK makes a metric meter available:
- In Workflow code, use https://python.temporal.io/temporalio.workflow.html#metric_meter
- In Activity code, use https://python.temporal.io/temporalio.activity.html#metric_meter
- In normal application code, use https://python.temporal.io/temporalio.runtime.Runtime.html#metric_meter
The attributes emitted by these default to `namespace`, `task_queue`, and `workflow_type`/`activity_type`; use
`with_additional_attributes` to create a meter emitting additional attributes.
#### OpenTelemetry Tracing
Tracing support requires the optional `opentelemetry` dependencies which are part of the `opentelemetry` extra. When
using `pip`, running
pip install 'temporalio[opentelemetry]'
will install needed dependencies. Then the `temporalio.contrib.opentelemetry.TracingInterceptor` can be created and set
as an interceptor on the `interceptors` argument of `Client.connect`. When set, spans will be created for all client
calls and for all activity and workflow invocations on the worker, spans will be created and properly serialized through
the server to give one proper trace for a workflow execution.
### Protobuf 3.x vs 4.x
Python currently has two somewhat-incompatible protobuf library versions - the 3.x series and the 4.x series. Python
currently recommends 4.x and that is the primary supported version. Some libraries like
[Pulumi](https://github.com/pulumi/pulumi) require 4.x. Other libraries such as [ONNX](https://github.com/onnx/onnx) and
[Streamlit](https://github.com/streamlit/streamlit), for one reason or another, have/will not leave 3.x.
To support these, Temporal Python SDK allows any protobuf library >= 3.19. However, the C extension in older Python
versions can cause issues with the sandbox due to global state sharing. Temporal strongly recommends using the latest
protobuf 4.x library unless you absolutely cannot at which point some proto libraries may have to be marked as
[Passthrough Modules](#passthrough-modules).
### Known Compatibility Issues
Below are known compatibility issues with the Python SDK.
#### gevent Patching
When using `gevent.monkey.patch_all()`, asyncio event loops can get messed up, especially those using custom event loops
like Temporal. See [this gevent issue](https://github.com/gevent/gevent/issues/982). This is a known incompatibility and
users are encouraged to not use gevent in asyncio applications (including Temporal). But if you must, there is
[a sample](https://github.com/temporalio/samples-python/tree/main/gevent_async) showing how it is possible.
# Development
The Python SDK is built to work with Python 3.9 and newer. It is built using
[SDK Core](https://github.com/temporalio/sdk-core/) which is written in Rust.
### Building
#### Prepare
To build the SDK from source for use as a dependency, the following prerequisites are required:
* [uv](https://docs.astral.sh/uv/)
* [Rust](https://www.rust-lang.org/)
* [Protobuf Compiler](https://protobuf.dev/)
Use `uv` to install `poe`:
```bash
uv tool install poethepoet
```
Now clone the SDK repository recursively:
```bash
git clone --recursive https://github.com/temporalio/sdk-python.git
cd sdk-python
```
Install the dependencies:
```bash
uv sync --all-extras
```
#### Build
Now perform the release build:
> This will take a while because Rust will compile the core project in release mode (see [Local SDK development
environment](#local-sdk-development-environment) for the quicker approach to local development).
```bash
uv build
```
The `.whl` wheel file in `dist/` is now ready to use.
#### Use
The wheel can now be installed into any virtual environment.
For example,
[create a virtual environment](https://packaging.python.org/en/latest/tutorials/installing-packages/#creating-virtual-environments)
somewhere and then run the following inside the virtual environment:
```bash
pip install wheel
```
```bash
pip install /path/to/cloned/sdk-python/dist/*.whl
```
Create this Python file at `example.py`:
```python
import asyncio
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker
@workflow.defn
class SayHello:
@workflow.run
async def run(self, name: str) -> str:
return f"Hello, {name}!"
async def main():
client = await Client.connect("localhost:7233")
async with Worker(client, task_queue="my-task-queue", workflows=[SayHello]):
result = await client.execute_workflow(SayHello.run, "Temporal",
id="my-workflow-id", task_queue="my-task-queue")
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
```
Assuming there is a [local Temporal server](https://docs.temporal.io/docs/server/quick-install/) running, execute the
file with `python` (or `python3` if necessary):
```bash
python example.py
```
It should output:
Result: Hello, Temporal!
### Local SDK development environment
For local development, it is quicker to use a debug build.
Perform the same steps as the "Prepare" section above by installing the prerequisites, cloning the project, and
installing dependencies:
```bash
git clone --recursive https://github.com/temporalio/sdk-python.git
cd sdk-python
uv sync --all-extras
```
Now compile the Rust extension in develop mode which is quicker than release mode:
```bash
poe build-develop
```
That step can be repeated for any Rust changes made.
The environment is now ready to develop in.
#### Testing
To execute tests:
```bash
poe test
```
This runs against [Temporalite](https://github.com/temporalio/temporalite). To run against the time-skipping test
server, pass `--workflow-environment time-skipping`. To run against the `default` namespace of an already-running
server, pass the `host:port` to `--workflow-environment`. Can also use regular pytest arguments. For example, here's how
to run a single test with debug logs on the console:
```bash
poe test -s --log-cli-level=DEBUG -k test_sync_activity_thread_cancel_caught
```
#### Proto Generation and Testing
To allow for backwards compatibility, protobuf code is generated on the 3.x series of the protobuf library. To generate
protobuf code, you must be on Python <= 3.10, and then run `uv add "protobuf<4"` + `uv sync --all-extras`. Then the
protobuf files can be generated via `poe gen-protos`. Tests can be run for protobuf version 3 by setting the
`TEMPORAL_TEST_PROTO3` env var to `1` prior to running tests.
Do not commit `uv.lock` or `pyproject.toml` changes. To go back from this downgrade, restore both of those files and run
`uv sync --all-extras`. Make sure you `poe format` the results.
For a less system-intrusive approach, you can:
```shell
docker build -f scripts/_proto/Dockerfile .
docker run --rm -v "${PWD}/temporalio/api:/api_new" -v "${PWD}/temporalio/bridge/proto:/bridge_new" <just built image sha>
poe format
```
### Style
* Mostly [Google Style Guide](https://google.github.io/styleguide/pyguide.html). Notable exceptions:
* We use [ruff](https://docs.astral.sh/ruff/) for formatting, so that takes precedence
* In tests and example code, can import individual classes/functions to make it more readable. Can also do this for
rarely in library code for some Python common items (e.g. `dataclass` or `partial`), but not allowed to do this for
any `temporalio` packages (except `temporalio.types`) or any classes/functions that aren't clear when unqualified.
* We allow relative imports for private packages
* We allow `@staticmethod`
Raw data
{
"_id": null,
"home_page": null,
"name": "temporalio",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "temporal, workflow",
"author": null,
"author_email": "Temporal Technologies Inc <sdk@temporal.io>",
"download_url": "https://files.pythonhosted.org/packages/0b/af/1a3619fc62333d0acbdf90cfc5ada97e68e8c0f79610363b2dbb30871d83/temporalio-1.15.0.tar.gz",
"platform": null,
"description": "\n\n[](https://pypi.org/project/temporalio)\n[](https://pypi.org/project/temporalio)\n[](LICENSE)\n\n[Temporal](https://temporal.io/) is a distributed, scalable, durable, and highly available orchestration engine used to\nexecute asynchronous, long-running business logic in a scalable and resilient way.\n\n\"Temporal Python SDK\" is the framework for authoring workflows and activities using the Python programming language.\n\nAlso see:\n* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) - Once you've tried our\n [Quick Start](#quick-start), check out our guide on how to use Temporal in your Python applications, including\n information around Temporal core concepts.\n* [Python Code Samples](https://github.com/temporalio/samples-python)\n* [API Documentation](https://python.temporal.io) - Complete Temporal Python SDK Package reference.\n\nIn addition to features common across all Temporal SDKs, the Python SDK also has the following interesting features:\n\n**Type Safe**\n\nThis library uses the latest typing and MyPy support with generics to ensure all calls can be typed. For example,\nstarting a workflow with an `int` parameter when it accepts a `str` parameter would cause MyPy to fail.\n\n**Different Activity Types**\n\nThe activity worker has been developed to work with `async def`, threaded, and multiprocess activities. Threaded activities are the initial recommendation, and further guidance can be found in [the docs](https://docs.temporal.io/develop/python/python-sdk-sync-vs-async).\n\n**Custom `asyncio` Event Loop**\n\nThe workflow implementation basically turns `async def` functions into workflows backed by a distributed, fault-tolerant\nevent loop. This means task management, sleep, cancellation, etc have all been developed to seamlessly integrate with\n`asyncio` concepts.\n\nSee the [blog post](https://temporal.io/blog/durable-distributed-asyncio-event-loop) introducing the Python SDK for an\ninformal introduction to the features and their implementation.\n\n---\n\n<!-- START doctoc generated TOC please keep comment here to allow auto update -->\n<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->\n**Contents**\n\n- [Quick Start](#quick-start)\n - [Installation](#installation)\n - [Implementing a Workflow](#implementing-a-workflow)\n - [Running a Workflow](#running-a-workflow)\n - [Next Steps](#next-steps)\n- [Usage](#usage)\n - [Client](#client)\n - [Data Conversion](#data-conversion)\n - [Pydantic Support](#pydantic-support)\n - [Custom Type Data Conversion](#custom-type-data-conversion)\n - [Workers](#workers)\n - [Workflows](#workflows)\n - [Definition](#definition)\n - [Running](#running)\n - [Invoking Activities](#invoking-activities)\n - [Invoking Child Workflows](#invoking-child-workflows)\n - [Timers](#timers)\n - [Conditions](#conditions)\n - [Asyncio and Determinism](#asyncio-and-determinism)\n - [Asyncio Cancellation](#asyncio-cancellation)\n - [Workflow Utilities](#workflow-utilities)\n - [Exceptions](#exceptions)\n - [Signal and update handlers](#signal-and-update-handlers)\n - [External Workflows](#external-workflows)\n - [Testing](#testing)\n - [Automatic Time Skipping](#automatic-time-skipping)\n - [Manual Time Skipping](#manual-time-skipping)\n - [Mocking Activities](#mocking-activities)\n - [Workflow Sandbox](#workflow-sandbox)\n - [How the Sandbox Works](#how-the-sandbox-works)\n - [Avoiding the Sandbox](#avoiding-the-sandbox)\n - [Customizing the Sandbox](#customizing-the-sandbox)\n - [Passthrough Modules](#passthrough-modules)\n - [Invalid Module Members](#invalid-module-members)\n - [Known Sandbox Issues](#known-sandbox-issues)\n - [Global Import/Builtins](#global-importbuiltins)\n - [Sandbox is not Secure](#sandbox-is-not-secure)\n - [Sandbox Performance](#sandbox-performance)\n - [Extending Restricted Classes](#extending-restricted-classes)\n - [Certain Standard Library Calls on Restricted Objects](#certain-standard-library-calls-on-restricted-objects)\n - [is_subclass of ABC-based Restricted Classes](#is_subclass-of-abc-based-restricted-classes)\n - [Activities](#activities)\n - [Definition](#definition-1)\n - [Types of Activities](#types-of-activities)\n - [Synchronous Activities](#synchronous-activities)\n - [Synchronous Multithreaded Activities](#synchronous-multithreaded-activities)\n - [Synchronous Multiprocess/Other Activities](#synchronous-multiprocessother-activities)\n - [Asynchronous Activities](#asynchronous-activities)\n - [Activity Context](#activity-context)\n - [Heartbeating and Cancellation](#heartbeating-and-cancellation)\n - [Worker Shutdown](#worker-shutdown)\n - [Testing](#testing-1)\n - [Interceptors](#interceptors)\n - [Nexus](#nexus)\n - [Plugins](#plugins)\n - [Client Plugins](#client-plugins)\n - [Worker Plugins](#worker-plugins)\n - [Workflow Replay](#workflow-replay)\n - [Observability](#observability)\n - [Metrics](#metrics)\n - [OpenTelemetry Tracing](#opentelemetry-tracing)\n - [Protobuf 3.x vs 4.x](#protobuf-3x-vs-4x)\n - [Known Compatibility Issues](#known-compatibility-issues)\n - [gevent Patching](#gevent-patching)\n- [Development](#development)\n - [Building](#building)\n - [Prepare](#prepare)\n - [Build](#build)\n - [Use](#use)\n - [Local SDK development environment](#local-sdk-development-environment)\n - [Testing](#testing-2)\n - [Proto Generation and Testing](#proto-generation-and-testing)\n - [Style](#style)\n\n<!-- END doctoc generated TOC please keep comment here to allow auto update -->\n\n# Quick Start\n\nWe will guide you through the Temporal basics to create a \"hello, world!\" script on your machine. It is not intended as\none of the ways to use Temporal, but in reality it is very simplified and decidedly not \"the only way\" to use Temporal.\nFor more information, check out the docs references in \"Next Steps\" below the quick start.\n\n## Installation\n\nInstall the `temporalio` package from [PyPI](https://pypi.org/project/temporalio).\n\nThese steps can be followed to use with a virtual environment and `pip`:\n\n* [Create a virtual environment](https://packaging.python.org/en/latest/tutorials/installing-packages/#creating-virtual-environments)\n* Update `pip` - `python -m pip install -U pip`\n * Needed because older versions of `pip` may not pick the right wheel\n* Install Temporal SDK - `python -m pip install temporalio`\n\nThe SDK is now ready for use. To build from source, see \"Building\" near the end of this documentation.\n\n**NOTE: This README is for the current branch and not necessarily what's released on `PyPI`.**\n\n## Implementing a Workflow\n\nCreate the following in `activities.py`:\n\n```python\nfrom temporalio import activity\n\n@activity.defn\ndef say_hello(name: str) -> str:\n return f\"Hello, {name}!\"\n```\n\nCreate the following in `workflows.py`:\n\n```python\nfrom datetime import timedelta\nfrom temporalio import workflow\n\n# Import our activity, passing it through the sandbox\nwith workflow.unsafe.imports_passed_through():\n from .activities import say_hello\n\n@workflow.defn\nclass SayHello:\n @workflow.run\n async def run(self, name: str) -> str:\n return await workflow.execute_activity(\n say_hello, name, schedule_to_close_timeout=timedelta(seconds=5)\n )\n```\n\nCreate the following in `run_worker.py`:\n\n```python\nimport asyncio\nimport concurrent.futures\nfrom temporalio.client import Client\nfrom temporalio.worker import Worker\n\n# Import the activity and workflow from our other files\nfrom .activities import say_hello\nfrom .workflows import SayHello\n\nasync def main():\n # Create client connected to server at the given address\n client = await Client.connect(\"localhost:7233\")\n\n # Run the worker\n with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:\n worker = Worker(\n client,\n task_queue=\"my-task-queue\",\n workflows=[SayHello],\n activities=[say_hello],\n activity_executor=activity_executor,\n )\n await worker.run()\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\nAssuming you have a [Temporal server running on localhost](https://docs.temporal.io/docs/server/quick-install/), this\nwill run the worker:\n\n python run_worker.py\n\n## Running a Workflow\n\nCreate the following script at `run_workflow.py`:\n\n```python\nimport asyncio\nfrom temporalio.client import Client\n\n# Import the workflow from the previous code\nfrom .workflows import SayHello\n\nasync def main():\n # Create client connected to server at the given address\n client = await Client.connect(\"localhost:7233\")\n\n # Execute a workflow\n result = await client.execute_workflow(SayHello.run, \"my name\", id=\"my-workflow-id\", task_queue=\"my-task-queue\")\n\n print(f\"Result: {result}\")\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\nAssuming you have `run_worker.py` running from before, this will run the workflow:\n\n python run_workflow.py\n\nThe output will be:\n\n Result: Hello, my-name!\n\n## Next Steps\n\nTemporal can be implemented in your code in many different ways, to suit your application's needs. The links below will\ngive you much more information about how Temporal works with Python:\n\n* [Code Samples](https://github.com/temporalio/samples-python) - If you want to start with some code, we have provided\n some pre-built samples.\n* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) Our Python specific\n Developer's Guide will give you much more information on how to build with Temporal in your Python applications than\n our SDK README ever could (or should).\n* [API Documentation](https://python.temporal.io) - Full Temporal Python SDK package documentation.\n\n---\n\n# Usage\n\nFrom here, you will find reference documentation about specific pieces of the Temporal Python SDK that were built around\nTemporal concepts. *This section is not intended as a how-to guide* -- For more how-to oriented information, check out\nthe links in the [Next Steps](#next-steps) section above.\n\n### Client\n\nA client can be created and used to start a workflow like so:\n\n```python\nfrom temporalio.client import Client\n\nasync def main():\n # Create client connected to server at the given address and namespace\n client = await Client.connect(\"localhost:7233\", namespace=\"my-namespace\")\n\n # Start a workflow\n handle = await client.start_workflow(MyWorkflow.run, \"some arg\", id=\"my-workflow-id\", task_queue=\"my-task-queue\")\n\n # Wait for result\n result = await handle.result()\n print(f\"Result: {result}\")\n```\n\nSome things to note about the above code:\n\n* A `Client` does not have an explicit \"close\"\n* To enable TLS, the `tls` argument to `connect` can be set to `True` or a `TLSConfig` object\n* A single positional argument can be passed to `start_workflow`. If there are multiple arguments, only the\n non-type-safe form of `start_workflow` can be used (i.e. the one accepting a string workflow name) and it must be in\n the `args` keyword argument.\n* The `handle` represents the workflow that was started and can be used for more than just getting the result\n* Since we are just getting the handle and waiting on the result, we could have called `client.execute_workflow` which\n does the same thing\n* Clients can have many more options not shown here (e.g. data converters and interceptors)\n* A string can be used instead of the method reference to call a workflow by name (e.g. if defined in another language)\n* Clients do not work across forks\n\nClients also provide a shallow copy of their config for use in making slightly different clients backed by the same\nconnection. For instance, given the `client` above, this is how to have a client in another namespace:\n\n```python\nconfig = client.config()\nconfig[\"namespace\"] = \"my-other-namespace\"\nother_ns_client = Client(**config)\n```\n\n#### Data Conversion\n\nData converters are used to convert raw Temporal payloads to/from actual Python types. A custom data converter of type\n`temporalio.converter.DataConverter` can be set via the `data_converter` parameter of the `Client` constructor. Data\nconverters are a combination of payload converters, payload codecs, and failure converters. Payload converters convert\nPython values to/from serialized bytes. Payload codecs convert bytes to bytes (e.g. for compression or encryption).\nFailure converters convert exceptions to/from serialized failures.\n\nThe default data converter supports converting multiple types including:\n\n* `None`\n* `bytes`\n* `google.protobuf.message.Message` - As JSON when encoding, but has ability to decode binary proto from other languages\n* Anything that can be converted to JSON including:\n * Anything that [`json.dump`](https://docs.python.org/3/library/json.html#json.dump) supports natively\n * [dataclasses](https://docs.python.org/3/library/dataclasses.html)\n * Iterables including ones JSON dump may not support by default, e.g. `set`\n * [IntEnum, StrEnum](https://docs.python.org/3/library/enum.html) based enumerates\n * [UUID](https://docs.python.org/3/library/uuid.html)\n * `datetime.datetime`\n\nTo use pydantic model instances, see [Pydantic Support](#pydantic-support).\n\n`datetime.date` and `datetime.time` can only be used with the Pydantic data converter.\n\nAlthough workflows, updates, signals, and queries can all be defined with multiple input parameters, users are strongly\nencouraged to use a single `dataclass` or Pydantic model parameter, so that fields with defaults can be easily added\nwithout breaking compatibility. Similar advice applies to return values.\n\nClasses with generics may not have the generics properly resolved. The current implementation does not have generic\ntype resolution. Users should use concrete types.\n\n##### Pydantic Support\n\nTo use Pydantic model instances, install Pydantic and set the Pydantic data converter when creating client instances:\n\n```python\nfrom temporalio.contrib.pydantic import pydantic_data_converter\n\nclient = Client(data_converter=pydantic_data_converter, ...)\n```\n\nThis data converter supports conversion of all types supported by Pydantic to and from JSON.\n\nIn addition to Pydantic models, these include all `json.dump`-able types, various non-`json.dump`-able standard library\ntypes such as dataclasses, types from the datetime module, sets, UUID, etc, and custom types composed of any of these.\n\nPydantic v1 is not supported by this data converter. If you are not yet able to upgrade from Pydantic v1, see\nhttps://github.com/temporalio/samples-python/tree/main/pydantic_converter/v1 for limited v1 support.\n\n\n##### Custom Type Data Conversion\n\nFor converting from JSON, the workflow/activity type hint is taken into account to convert to the proper type. Care has\nbeen taken to support all common typings including `Optional`, `Union`, all forms of iterables and mappings, `NewType`,\netc in addition to the regular JSON values mentioned before.\n\nData converters contain a reference to a payload converter class that is used to convert to/from payloads/values. This\nis a class and not an instance because it is instantiated on every workflow run inside the sandbox. The payload\nconverter is usually a `CompositePayloadConverter` which contains a multiple `EncodingPayloadConverter`s it uses to try\nto serialize/deserialize payloads. Upon serialization, each `EncodingPayloadConverter` is tried until one succeeds. The\n`EncodingPayloadConverter` provides an \"encoding\" string serialized onto the payload so that, upon deserialization, the\nspecific `EncodingPayloadConverter` for the given \"encoding\" is used.\n\nThe default data converter uses the `DefaultPayloadConverter` which is simply a `CompositePayloadConverter` with a known\nset of default `EncodingPayloadConverter`s. To implement a custom encoding for a custom type, a new\n`EncodingPayloadConverter` can be created for the new type. For example, to support `IPv4Address` types:\n\n```python\nclass IPv4AddressEncodingPayloadConverter(EncodingPayloadConverter):\n @property\n def encoding(self) -> str:\n return \"text/ipv4-address\"\n\n def to_payload(self, value: Any) -> Optional[Payload]:\n if isinstance(value, ipaddress.IPv4Address):\n return Payload(\n metadata={\"encoding\": self.encoding.encode()},\n data=str(value).encode(),\n )\n else:\n return None\n\n def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any:\n assert not type_hint or type_hint is ipaddress.IPv4Address\n return ipaddress.IPv4Address(payload.data.decode())\n\nclass IPv4AddressPayloadConverter(CompositePayloadConverter):\n def __init__(self) -> None:\n # Just add ours as first before the defaults\n super().__init__(\n IPv4AddressEncodingPayloadConverter(),\n *DefaultPayloadConverter.default_encoding_payload_converters,\n )\n\nmy_data_converter = dataclasses.replace(\n DataConverter.default,\n payload_converter_class=IPv4AddressPayloadConverter,\n)\n```\n\nImports are left off for brevity.\n\nThis is good for many custom types. However, sometimes you want to override the behavior of the just the existing JSON\nencoding payload converter to support a new type. It is already the last encoding data converter in the list, so it's\nthe fall-through behavior for any otherwise unknown type. Customizing the existing JSON converter has the benefit of\nmaking the type work in lists, unions, etc.\n\nThe `JSONPlainPayloadConverter` uses the Python [json](https://docs.python.org/3/library/json.html) library with an\nadvanced JSON encoder by default and a custom value conversion method to turn `json.load`ed values to their type hints.\nThe conversion can be customized for serialization with a custom `json.JSONEncoder` and deserialization with a custom\n`JSONTypeConverter`. For example, to support `IPv4Address` types in existing JSON conversion:\n\n```python\nclass IPv4AddressJSONEncoder(AdvancedJSONEncoder):\n def default(self, o: Any) -> Any:\n if isinstance(o, ipaddress.IPv4Address):\n return str(o)\n return super().default(o)\nclass IPv4AddressJSONTypeConverter(JSONTypeConverter):\n def to_typed_value(\n self, hint: Type, value: Any\n ) -> Union[Optional[Any], _JSONTypeConverterUnhandled]:\n if issubclass(hint, ipaddress.IPv4Address):\n return ipaddress.IPv4Address(value)\n return JSONTypeConverter.Unhandled\n\nclass IPv4AddressPayloadConverter(CompositePayloadConverter):\n def __init__(self) -> None:\n # Replace default JSON plain with our own that has our encoder and type\n # converter\n json_converter = JSONPlainPayloadConverter(\n encoder=IPv4AddressJSONEncoder,\n custom_type_converters=[IPv4AddressJSONTypeConverter()],\n )\n super().__init__(\n *[\n c if not isinstance(c, JSONPlainPayloadConverter) else json_converter\n for c in DefaultPayloadConverter.default_encoding_payload_converters\n ]\n )\n\nmy_data_converter = dataclasses.replace(\n DataConverter.default,\n payload_converter_class=IPv4AddressPayloadConverter,\n)\n```\n\nNow `IPv4Address` can be used in type hints including collections, optionals, etc.\n\n### Workers\n\nWorkers host workflows and/or activities. Here's how to run a worker:\n\n```python\nimport asyncio\nimport logging\nfrom temporalio.client import Client\nfrom temporalio.worker import Worker\n# Import your own workflows and activities\nfrom my_workflow_package import MyWorkflow, my_activity\n\nasync def run_worker(stop_event: asyncio.Event):\n # Create client connected to server at the given address\n client = await Client.connect(\"localhost:7233\", namespace=\"my-namespace\")\n\n # Run the worker until the event is set\n worker = Worker(client, task_queue=\"my-task-queue\", workflows=[MyWorkflow], activities=[my_activity])\n async with worker:\n await stop_event.wait()\n```\n\nSome things to note about the above code:\n\n* This creates/uses the same client that is used for starting workflows\n* While this example accepts a stop event and uses `async with`, `run()` and `shutdown()` may be used instead\n* Workers can have many more options not shown here (e.g. data converters and interceptors)\n\n### Workflows\n\n#### Definition\n\nWorkflows are defined as classes decorated with `@workflow.defn`. The method invoked for the workflow is decorated with\n`@workflow.run`. Methods for signals, queries, and updates are decorated with `@workflow.signal`, `@workflow.query`\nand `@workflow.update` respectively. Here's an example of a workflow:\n\n```python\nimport asyncio\nfrom datetime import timedelta\nfrom temporalio import workflow\n\n# Pass the activities through the sandbox\nwith workflow.unsafe.imports_passed_through():\n from .my_activities import GreetingInfo, create_greeting_activity\n\n@workflow.defn\nclass GreetingWorkflow:\n def __init__(self) -> None:\n self._current_greeting = \"<unset>\"\n self._greeting_info = GreetingInfo()\n self._greeting_info_update = asyncio.Event()\n self._complete = asyncio.Event()\n\n @workflow.run\n async def run(self, name: str) -> str:\n self._greeting_info.name = name\n while True:\n # Store greeting\n self._current_greeting = await workflow.execute_activity(\n create_greeting_activity,\n self._greeting_info,\n start_to_close_timeout=timedelta(seconds=5),\n )\n workflow.logger.debug(\"Greeting set to %s\", self._current_greeting)\n\n # Wait for salutation update or complete signal (this can be\n # cancelled)\n await asyncio.wait(\n [\n asyncio.create_task(self._greeting_info_update.wait()),\n asyncio.create_task(self._complete.wait()),\n ],\n return_when=asyncio.FIRST_COMPLETED,\n )\n if self._complete.is_set():\n return self._current_greeting\n self._greeting_info_update.clear()\n\n @workflow.signal\n async def update_salutation(self, salutation: str) -> None:\n self._greeting_info.salutation = salutation\n self._greeting_info_update.set()\n\n @workflow.signal\n async def complete_with_greeting(self) -> None:\n self._complete.set()\n\n @workflow.query\n def current_greeting(self) -> str:\n return self._current_greeting\n\n @workflow.update\n def set_and_get_greeting(self, greeting: str) -> str:\n old = self._current_greeting\n self._current_greeting = greeting\n return old\n\n```\n\nThis assumes there's an activity in `my_activities.py` like:\n\n```python\nfrom dataclasses import dataclass\nfrom temporalio import workflow\n\n@dataclass\nclass GreetingInfo:\n salutation: str = \"Hello\"\n name: str = \"<unknown>\"\n\n@activity.defn\ndef create_greeting_activity(info: GreetingInfo) -> str:\n return f\"{info.salutation}, {info.name}!\"\n```\n\nSome things to note about the above workflow code:\n\n* Workflows run in a sandbox by default.\n * Users are encouraged to define workflows in files with no side effects or other complicated code or unnecessary\n imports to other third party libraries.\n * Non-standard-library, non-`temporalio` imports should usually be \"passed through\" the sandbox. See the\n [Workflow Sandbox](#workflow-sandbox) section for more details.\n* This workflow continually updates the queryable current greeting when signalled and can complete with the greeting on\n a different signal\n* Workflows are always classes and must have a single `@workflow.run` which is an `async def` function\n* Workflow code must be deterministic. This means no `set` iteration, threading, no randomness, no external calls to\n processes, no network IO, and no global state mutation. All code must run in the implicit `asyncio` event loop and be\n deterministic. Also see the [Asyncio and Determinism](#asyncio-and-determinism) section later.\n* `@activity.defn` is explained in a later section. For normal simple string concatenation, this would just be done in\n the workflow. The activity is for demonstration purposes only.\n* `workflow.execute_activity(create_greeting_activity, ...` is actually a typed signature, and MyPy will fail if the\n `self._greeting_info` parameter is not a `GreetingInfo`\n\nHere are the decorators that can be applied:\n\n* `@workflow.defn` - Defines a workflow class\n * Must be defined on the class given to the worker (ignored if present on a base class)\n * Can have a `name` param to customize the workflow name, otherwise it defaults to the unqualified class name\n * Can have `dynamic=True` which means all otherwise unhandled workflows fall through to this. If present, cannot have\n `name` argument, and run method must accept a single parameter of `Sequence[temporalio.common.RawValue]` type. The\n payload of the raw value can be converted via `workflow.payload_converter().from_payload`.\n* `@workflow.run` - Defines the primary workflow run method\n * Must be defined on the same class as `@workflow.defn`, not a base class (but can _also_ be defined on the same\n method of a base class)\n * Exactly one method name must have this decorator, no more or less\n * Must be defined on an `async def` method\n * The method's arguments are the workflow's arguments\n * The first parameter must be `self`, followed by positional arguments. Best practice is to only take a single\n argument that is an object/dataclass of fields that can be added to as needed.\n* `@workflow.init` - Specifies that the `__init__` method accepts the workflow's arguments.\n * If present, may only be applied to the `__init__` method, the parameters of which must then be identical to those of\n the `@workflow.run` method.\n * The purpose of this decorator is to allow operations involving workflow arguments to be performed in the `__init__`\n method, before any signal or update handler has a chance to execute.\n* `@workflow.signal` - Defines a method as a signal\n * Can be defined on an `async` or non-`async` method at any point in the class hierarchy, but if the decorated method\n is overridden, then the override must also be decorated.\n * The method's arguments are the signal's arguments.\n * Return value is ignored.\n * May mutate workflow state, and make calls to other workflow APIs like starting activities, etc.\n * Can have a `name` param to customize the signal name, otherwise it defaults to the unqualified method name.\n * Can have `dynamic=True` which means all otherwise unhandled signals fall through to this. If present, cannot have\n `name` argument, and method parameters must be `self`, a string signal name, and a\n `Sequence[temporalio.common.RawValue]`.\n * Non-dynamic method can only have positional arguments. Best practice is to only take a single argument that is an\n object/dataclass of fields that can be added to as needed.\n * See [Signal and update handlers](#signal-and-update-handlers) below\n* `@workflow.update` - Defines a method as an update\n * Can be defined on an `async` or non-`async` method at any point in the class hierarchy, but if the decorated method\n is overridden, then the override must also be decorated.\n * May accept input and return a value\n * The method's arguments are the update's arguments.\n * May be `async` or non-`async`\n * May mutate workflow state, and make calls to other workflow APIs like starting activities, etc.\n * Also accepts the `name` and `dynamic` parameters like signal, with the same semantics.\n * Update handlers may optionally define a validator method by decorating it with `@update_handler_method.validator`.\n To reject an update before any events are written to history, throw an exception in a validator. Validators cannot\n be `async`, cannot mutate workflow state, and return nothing.\n * See [Signal and update handlers](#signal-and-update-handlers) below\n* `@workflow.query` - Defines a method as a query\n * Should return a value\n * Should not be `async`\n * Temporal queries should never mutate anything in the workflow or call any calls that would mutate the workflow\n * Also accepts the `name` and `dynamic` parameters like signal and update, with the same semantics.\n\n#### Running\n\nTo start a locally-defined workflow from a client, you can simply reference its method like so:\n\n```python\nfrom temporalio.client import Client\nfrom my_workflow_package import GreetingWorkflow\n\nasync def create_greeting(client: Client) -> str:\n # Start the workflow\n handle = await client.start_workflow(GreetingWorkflow.run, \"my name\", id=\"my-workflow-id\", task_queue=\"my-task-queue\")\n # Change the salutation\n await handle.signal(GreetingWorkflow.update_salutation, \"Aloha\")\n # Tell it to complete\n await handle.signal(GreetingWorkflow.complete_with_greeting)\n # Wait and return result\n return await handle.result()\n```\n\nSome things to note about the above code:\n\n* This uses the `GreetingWorkflow` from the previous section\n* The result of calling this function is `\"Aloha, my name!\"`\n* `id` and `task_queue` are required for running a workflow\n* `client.start_workflow` is typed, so MyPy would fail if `\"my name\"` were something besides a string\n* `handle.signal` is typed, so MyPy would fail if `\"Aloha\"` were something besides a string or if we provided a\n parameter to the parameterless `complete_with_greeting`\n* `handle.result` is typed to the workflow itself, so MyPy would fail if we said this `create_greeting` returned\n something besides a string\n\n#### Invoking Activities\n\n* Activities are started with non-async `workflow.start_activity()` which accepts either an activity function reference\n or a string name.\n* A single argument to the activity is positional. Multiple arguments are not supported in the type-safe form of\n start/execute activity and must be supplied via the `args` keyword argument.\n* Activity options are set as keyword arguments after the activity arguments. At least one of `start_to_close_timeout`\n or `schedule_to_close_timeout` must be provided.\n* The result is an activity handle which is an `asyncio.Task` and supports basic task features\n* An async `workflow.execute_activity()` helper is provided which takes the same arguments as\n `workflow.start_activity()` and `await`s on the result. This should be used in most cases unless advanced task\n capabilities are needed.\n* Local activities work very similarly except the functions are `workflow.start_local_activity()` and\n `workflow.execute_local_activity()`\n * \u26a0\ufe0fLocal activities are currently experimental\n* Activities can be methods of a class. Invokers should use `workflow.start_activity_method()`,\n `workflow.execute_activity_method()`, `workflow.start_local_activity_method()`, and\n `workflow.execute_local_activity_method()` instead.\n* Activities can callable classes (i.e. that define `__call__`). Invokers should use `workflow.start_activity_class()`,\n `workflow.execute_activity_class()`, `workflow.start_local_activity_class()`, and\n `workflow.execute_local_activity_class()` instead.\n\n#### Invoking Child Workflows\n\n* Child workflows are started with async `workflow.start_child_workflow()` which accepts either a workflow run method\n reference or a string name. The arguments to the workflow are positional.\n* A single argument to the child workflow is positional. Multiple arguments are not supported in the type-safe form of\n start/execute child workflow and must be supplied via the `args` keyword argument.\n* Child workflow options are set as keyword arguments after the arguments. At least `id` must be provided.\n* The `await` of the start does not complete until the start has been accepted by the server\n* The result is a child workflow handle which is an `asyncio.Task` and supports basic task features. The handle also has\n some child info and supports signalling the child workflow\n* An async `workflow.execute_child_workflow()` helper is provided which takes the same arguments as\n `workflow.start_child_workflow()` and `await`s on the result. This should be used in most cases unless advanced task\n capabilities are needed.\n\n#### Timers\n\n* A timer is represented by normal `asyncio.sleep()` or a `workflow.sleep()` call\n* Timers are also implicitly started on any `asyncio` calls with timeouts (e.g. `asyncio.wait_for`)\n* Timers are Temporal server timers, not local ones, so sub-second resolution rarely has value\n* Calls that use a specific point in time, e.g. `call_at` or `timeout_at`, should be based on the current loop time\n (i.e. `workflow.time()`) and not an actual point in time. This is because fixed times are translated to relative ones\n by subtracting the current loop time which may not be the actual current time.\n\n#### Conditions\n\n* `workflow.wait_condition` is an async function that doesn't return until a provided callback returns true\n* A `timeout` can optionally be provided which will throw a `asyncio.TimeoutError` if reached (internally backed by\n `asyncio.wait_for` which uses a timer)\n\n#### Asyncio and Determinism\n\nWorkflows must be deterministic. Workflows are backed by a custom\n[asyncio](https://docs.python.org/3/library/asyncio.html) event loop. This means many of the common `asyncio` calls work\nas normal. Some asyncio features are disabled such as:\n\n* Thread related calls such as `to_thread()`, `run_coroutine_threadsafe()`, `loop.run_in_executor()`, etc\n* Calls that alter the event loop such as `loop.close()`, `loop.stop()`, `loop.run_forever()`,\n `loop.set_task_factory()`, etc\n* Calls that use anything external such as networking, subprocesses, disk IO, etc\n\nAlso, there are some `asyncio` utilities that internally use `set()` which can make them non-deterministic from one\nworker to the next. Therefore the following `asyncio` functions have `workflow`-module alternatives that are\ndeterministic:\n\n* `asyncio.as_completed()` - use `workflow.as_completed()`\n* `asyncio.wait()` - use `workflow.wait()`\n\n#### Asyncio Cancellation\n\nCancellation is done using `asyncio` [task cancellation](https://docs.python.org/3/library/asyncio-task.html#task-cancellation).\nThis means that tasks are requested to be cancelled but can catch the\n[`asyncio.CancelledError`](https://docs.python.org/3/library/asyncio-exceptions.html#asyncio.CancelledError), thus\nallowing them to perform some cleanup before allowing the cancellation to proceed (i.e. re-raising the error), or to\ndeny the cancellation entirely. It also means that\n[`asyncio.shield()`](https://docs.python.org/3/library/asyncio-task.html#shielding-from-cancellation) can be used to\nprotect tasks against cancellation.\n\nThe following tasks, when cancelled, perform a Temporal cancellation:\n\n* Activities - when the task executing an activity is cancelled, a cancellation request is sent to the activity\n* Child workflows - when the task starting or executing a child workflow is cancelled, a cancellation request is sent to\n cancel the child workflow\n* Timers - when the task executing a timer is cancelled (whether started via sleep or timeout), the timer is cancelled\n\nWhen the workflow itself is requested to cancel, `Task.cancel` is called on the main workflow task. Therefore,\n`asyncio.CancelledError` can be caught in order to handle the cancel gracefully.\n\nWorkflows follow `asyncio` cancellation rules exactly which can cause confusion among Python developers. Cancelling a\ntask doesn't always cancel the thing it created. For example, given\n`task = asyncio.create_task(workflow.start_child_workflow(...`, calling `task.cancel` does not cancel the child\nworkflow, it only cancels the starting of it, which has no effect if it has already started. However, cancelling the\nresult of `handle = await workflow.start_child_workflow(...` or\n`task = asyncio.create_task(workflow.execute_child_workflow(...` _does_ cancel the child workflow.\n\nAlso, due to Temporal rules, a cancellation request is a state not an event. Therefore, repeated cancellation requests\nare not delivered, only the first. If the workflow chooses swallow a cancellation, it cannot be requested again.\n\n#### Workflow Utilities\n\nWhile running in a workflow, in addition to features documented elsewhere, the following items are available from the\n`temporalio.workflow` package:\n\n* `continue_as_new()` - Async function to stop the workflow immediately and continue as new\n* `info()` - Returns information about the current workflow\n* `logger` - A logger for use in a workflow (properly skips logging on replay)\n* `now()` - Returns the \"current time\" from the workflow's perspective\n\n#### Exceptions\n\n* Workflows/updates can raise exceptions to fail the workflow or the \"workflow task\" (i.e. suspend the workflow\n in a retrying state).\n* Exceptions that are instances of `temporalio.exceptions.FailureError` will fail the workflow with that exception\n * For failing the workflow explicitly with a user exception, use `temporalio.exceptions.ApplicationError`. This can\n be marked non-retryable or include details as needed.\n * Other exceptions that come from activity execution, child execution, cancellation, etc are already instances of\n `FailureError` and will fail the workflow when uncaught.\n* Update handlers are special: an instance of `temporalio.exceptions.FailureError` raised in an update handler will fail\n the update instead of failing the workflow.\n* All other exceptions fail the \"workflow task\" which means the workflow will continually retry until the workflow is\n fixed. This is helpful for bad code or other non-predictable exceptions. To actually fail the workflow, use an\n `ApplicationError` as mentioned above.\n\nThis default can be changed by providing a list of exception types to `workflow_failure_exception_types` when creating a\n`Worker` or `failure_exception_types` on the `@workflow.defn` decorator. If a workflow-thrown exception is an instance\nof any type in either list, it will fail the workflow (or update) instead of the workflow task. This means a value of\n`[Exception]` will cause every exception to fail the workflow instead of the workflow task. Also, as a special case, if\n`temporalio.workflow.NondeterminismError` (or any superclass of it) is set, non-deterministic exceptions will fail the\nworkflow. WARNING: These settings are experimental.\n\n#### Signal and update handlers\n\nSignal and update handlers are defined using decorated methods as shown in the example [above](#definition). Client code\nsends signals and updates using `workflow_handle.signal`, `workflow_handle.execute_update`, or\n`workflow_handle.start_update`. When the workflow receives one of these requests, it starts an `asyncio.Task` executing\nthe corresponding handler method with the argument(s) from the request.\n\nThe handler methods may be `async def` and can do all the async operations described above (e.g. invoking activities and\nchild workflows, and waiting on timers and conditions). Notice that this means that handler tasks will be executing\nconcurrently with respect to each other and the main workflow task. Use\n[asyncio.Lock](https://docs.python.org/3/library/asyncio-sync.html#lock) and\n[asyncio.Semaphore](https://docs.python.org/3/library/asyncio-sync.html#semaphore) if necessary.\n\nYour main workflow task may finish as a result of successful completion, cancellation, continue-as-new, or failure. You\nshould ensure that all in-progress signal and update handler tasks have finished before this happens; if you do not, you\nwill see a warning (the warning can be disabled via the `workflow.signal`/`workflow.update` decorators). One way to\nensure that handler tasks have finished is to wait on the `workflow.all_handlers_finished` condition:\n```python\nawait workflow.wait_condition(workflow.all_handlers_finished)\n```\n#### External Workflows\n\n* `workflow.get_external_workflow_handle()` inside a workflow returns a handle to interact with another workflow\n* `workflow.get_external_workflow_handle_for()` can be used instead for a type safe handle\n* `await handle.signal()` can be called on the handle to signal the external workflow\n* `await handle.cancel()` can be called on the handle to send a cancel to the external workflow\n\n#### Testing\n\nWorkflow testing can be done in an integration-test fashion against a real server, however it is hard to simulate\ntimeouts and other long time-based code. Using the time-skipping workflow test environment can help there.\n\nThe time-skipping `temporalio.testing.WorkflowEnvironment` can be created via the static async `start_time_skipping()`.\nThis internally downloads the Temporal time-skipping test server to a temporary directory if it doesn't already exist,\nthen starts the test server which has special APIs for skipping time.\n\n**NOTE:** The time-skipping test environment does not work on ARM. The SDK will try to download the x64 binary on macOS\nfor use with the Intel emulator, but for Linux or Windows ARM there is no proper time-skipping test server at this time.\n\n##### Automatic Time Skipping\n\nAnytime a workflow result is waited on, the time-skipping server automatically advances to the next event it can. To\nmanually advance time before waiting on the result of a workflow, the `WorkflowEnvironment.sleep` method can be used.\n\nHere's a simple example of a workflow that sleeps for 24 hours:\n\n```python\nimport asyncio\nfrom temporalio import workflow\n\n@workflow.defn\nclass WaitADayWorkflow:\n @workflow.run\n async def run(self) -> str:\n await asyncio.sleep(24 * 60 * 60)\n return \"all done\"\n```\n\nAn integration test of this workflow would be way too slow. However the time-skipping server automatically skips to the\nnext event when we wait on the result. Here's a test for that workflow:\n\n```python\nfrom temporalio.testing import WorkflowEnvironment\nfrom temporalio.worker import Worker\n\nasync def test_wait_a_day_workflow():\n async with await WorkflowEnvironment.start_time_skipping() as env:\n async with Worker(env.client, task_queue=\"tq1\", workflows=[WaitADayWorkflow]):\n assert \"all done\" == await env.client.execute_workflow(WaitADayWorkflow.run, id=\"wf1\", task_queue=\"tq1\")\n```\n\nThat test will run almost instantly. This is because by calling `execute_workflow` on our client, we have asked the\nenvironment to automatically skip time as much as it can (basically until the end of the workflow or until an activity\nis run).\n\nTo disable automatic time-skipping while waiting for a workflow result, run code inside a\n`with env.auto_time_skipping_disabled():` block.\n\n##### Manual Time Skipping\n\nUntil a workflow is waited on, all time skipping in the time-skipping environment is done manually via\n`WorkflowEnvironment.sleep`.\n\nHere's workflow that waits for a signal or times out:\n\n```python\nimport asyncio\nfrom temporalio import workflow\n\n@workflow.defn\nclass SignalWorkflow:\n def __init__(self) -> None:\n self.signal_received = False\n\n @workflow.run\n async def run(self) -> str:\n # Wait for signal or timeout in 45 seconds\n try:\n await workflow.wait_condition(lambda: self.signal_received, timeout=45)\n return \"got signal\"\n except asyncio.TimeoutError:\n return \"got timeout\"\n\n @workflow.signal\n def some_signal(self) -> None:\n self.signal_received = True\n```\n\nTo test a normal signal, you might:\n\n```python\nfrom temporalio.testing import WorkflowEnvironment\nfrom temporalio.worker import Worker\n\nasync def test_signal_workflow():\n async with await WorkflowEnvironment.start_time_skipping() as env:\n async with Worker(env.client, task_queue=\"tq1\", workflows=[SignalWorkflow]):\n # Start workflow, send signal, check result\n handle = await env.client.start_workflow(SignalWorkflow.run, id=\"wf1\", task_queue=\"tq1\")\n await handle.signal(SignalWorkflow.some_signal)\n assert \"got signal\" == await handle.result()\n```\n\nBut how would you test the timeout part? Like so:\n\n```python\nfrom temporalio.testing import WorkflowEnvironment\nfrom temporalio.worker import Worker\n\nasync def test_signal_workflow_timeout():\n async with await WorkflowEnvironment.start_time_skipping() as env:\n async with Worker(env.client, task_queue=\"tq1\", workflows=[SignalWorkflow]):\n # Start workflow, advance time past timeout, check result\n handle = await env.client.start_workflow(SignalWorkflow.run, id=\"wf1\", task_queue=\"tq1\")\n await env.sleep(50)\n assert \"got timeout\" == await handle.result()\n```\n\nAlso, the current time of the workflow environment can be obtained via the async `WorkflowEnvironment.get_current_time`\nmethod.\n\n##### Mocking Activities\n\nActivities are just functions decorated with `@activity.defn`. Simply write different ones and pass those to the worker\nto have different activities called during the test.\n\n#### Workflow Sandbox\n\nBy default workflows are run in a sandbox to help avoid non-deterministic code. If a call that is known to be\nnon-deterministic is performed, an exception will be thrown in the workflow which will \"fail the task\" which means the\nworkflow will not progress until fixed.\n\nThe sandbox is not foolproof and non-determinism can still occur. It is simply a best-effort way to catch bad code\nearly. Users are encouraged to define their workflows in files with no other side effects.\n\nThe sandbox offers a mechanism to \"pass through\" modules from outside the sandbox. By default this already includes all\nstandard library modules and Temporal modules. **For performance and behavior reasons, users are encouraged to pass\nthrough all modules whose calls will be deterministic.** In particular, this advice extends to modules containing the\nactivities to be referenced in workflows, and modules containing dataclasses and Pydantic models, which can be\nparticularly expensive to import. See \"Passthrough Modules\" below on how to do this.\n\n\n##### How the Sandbox Works\n\nThe sandbox is made up of two components that work closely together:\n\n* Global state isolation\n* Restrictions preventing known non-deterministic library calls\n\nGlobal state isolation is performed by using `exec`. Upon workflow start, and every time that the workflow is replayed,\nthe file that the workflow is defined in is re-imported into a new sandbox created for that workflow run. In order to\nkeep the sandbox performant, not all modules are re-imported in this way: instead, a known set of \"passthrough modules\"\nare obtained as references to the already-imported module _outside_ the sandbox. These modules should be side-effect\nfree on import and, if they make any non-deterministic calls, then these should be restricted by sandbox restriction\nrules. By default the entire Python standard library, `temporalio`, and a couple of other modules are \"passed through\"\nin this way from outside of the sandbox. To update this list, see \"Customizing the Sandbox\".\n\nRestrictions preventing known non-deterministic library calls are achieved using proxy objects on modules wrapped around\nthe custom importer set in the sandbox. Many restrictions apply at workflow import time and workflow run time, while\nsome restrictions only apply at workflow run time. A default set of restrictions is included that prevents most\ndangerous standard library calls. However it is known in Python that some otherwise-non-deterministic invocations, like\nreading a file from disk via `open` or using `os.environ`, are done as part of importing modules. To customize what is\nand isn't restricted, see \"Customizing the Sandbox\".\n\n##### Avoiding the Sandbox\n\nThere are three increasingly-scoped ways to avoid the sandbox. Users are discouraged from avoiding the sandbox if\npossible, except for passing through safe modules, which is recommended.\n\nTo remove restrictions around a particular block of code, use `with temporalio.workflow.unsafe.sandbox_unrestricted():`.\nThe workflow will still be running in the sandbox, but no restrictions for invalid library calls will be applied.\n\nTo run an entire workflow outside of a sandbox, set `sandboxed=False` on the `@workflow.defn` decorator when defining\nit. This will run the entire workflow outside of the sandbox which means it can share global state and other bad\nthings.\n\nTo disable the sandbox entirely for a worker, set the `Worker` init's `workflow_runner` keyword argument to\n`temporalio.worker.UnsandboxedWorkflowRunner()`. This value is defaulted to\n`temporalio.worker.workflow_sandbox.SandboxedWorkflowRunner()` so by changing it to the unsandboxed runner, the sandbox\nwill not be used at all.\n\n##### Customizing the Sandbox\n\n\u26a0\ufe0f WARNING: APIs in the `temporalio.worker.workflow_sandbox` module are not yet considered stable and may change in\nfuture releases.\n\nWhen creating the `Worker`, the `workflow_runner` is defaulted to\n`temporalio.worker.workflow_sandbox.SandboxedWorkflowRunner()`. The `SandboxedWorkflowRunner`'s init accepts a\n`restrictions` keyword argument that is defaulted to `SandboxRestrictions.default`. The `SandboxRestrictions` dataclass\nis immutable and contains three fields that can be customized, but only two have notable value. See below.\n\n###### Passthrough Modules\n\nBy default the sandbox completely reloads non-standard-library and non-Temporal modules for every workflow run. To make\nthe sandbox quicker and use less memory when importing known-side-effect-free modules, they can be marked\nas passthrough modules.\n\n**For performance and behavior reasons, users are encouraged to pass through all third party modules whose calls will be\ndeterministic.** In particular, this advice extends to modules containing the activities to be referenced in workflows,\nand modules containing dataclasses and Pydantic models, which can be particularly expensive to import.\n\nOne way to pass through a module is at import time in the workflow file using the `imports_passed_through` context\nmanager like so:\n\n```python\n# my_workflow_file.py\n\nfrom temporalio import workflow\n\nwith workflow.unsafe.imports_passed_through():\n import pydantic\n\n@workflow.defn\nclass MyWorkflow:\n ...\n```\n\nAlternatively, this can be done at worker creation time by customizing the runner's restrictions. For example:\n\n```python\nmy_worker = Worker(\n ...,\n workflow_runner=SandboxedWorkflowRunner(\n restrictions=SandboxRestrictions.default.with_passthrough_modules(\"pydantic\")\n )\n)\n```\n\nIn both of these cases, now the `pydantic` module will be passed through from outside of the sandbox instead of\nbeing reloaded for every workflow run.\n\nIf users are sure that no imports they use in workflow files will ever need to be sandboxed (meaning all calls within\nare deterministic and never mutate shared, global state), the `passthrough_all_modules` option can be set on the\nrestrictions or the `with_passthrough_all_modules` helper can by used, for example:\n\n```python\nmy_worker = Worker(\n ...,\n workflow_runner=SandboxedWorkflowRunner(\n restrictions=SandboxRestrictions.default.with_passthrough_all_modules()\n )\n)\n```\n\nNote, some calls from the module may still be checked for invalid calls at runtime for certain builtins.\n\n###### Invalid Module Members\n\n`SandboxRestrictions.invalid_module_members` contains a root matcher that applies to all module members. This already\nhas a default set which includes things like `datetime.date.today()` which should never be called from a workflow. To\nremove this restriction:\n\n```python\nmy_restrictions = dataclasses.replace(\n SandboxRestrictions.default,\n invalid_module_members=SandboxRestrictions.invalid_module_members_default.with_child_unrestricted(\n \"datetime\", \"date\", \"today\",\n ),\n)\nmy_worker = Worker(..., workflow_runner=SandboxedWorkflowRunner(restrictions=my_restrictions))\n```\n\nRestrictions can also be added by `|`'ing together matchers, for example to restrict the `datetime.date` class from\nbeing used altogether:\n\n```python\nmy_restrictions = dataclasses.replace(\n SandboxRestrictions.default,\n invalid_module_members=SandboxRestrictions.invalid_module_members_default | SandboxMatcher(\n children={\"datetime\": SandboxMatcher(use={\"date\"})},\n ),\n)\nmy_worker = Worker(..., workflow_runner=SandboxedWorkflowRunner(restrictions=my_restrictions))\n```\n\nSee the API for more details on exact fields and their meaning.\n\n##### Known Sandbox Issues\n\nBelow are known sandbox issues. As the sandbox is developed and matures, some may be resolved.\n\n###### Global Import/Builtins\n\nCurrently the sandbox references/alters the global `sys.modules` and `builtins` fields while running workflow code. In\norder to prevent affecting other sandboxed code, thread locals are leveraged to only intercept these values during the\nworkflow thread running. Therefore, technically if top-level import code starts a thread, it may lose sandbox\nprotection.\n\n###### Sandbox is not Secure\n\nThe sandbox is built to catch many non-deterministic and state sharing issues, but it is not secure. Some known bad\ncalls are intercepted, but for performance reasons, every single attribute get/set cannot be checked. Therefore a simple\ncall like `setattr(temporalio.common, \"__my_key\", \"my value\")` will leak across sandbox runs.\n\nThe sandbox is only a helper, it does not provide full protection.\n\n###### Sandbox Performance\n\nThe sandbox does not add significant CPU or memory overhead for workflows that are in files which only import standard\nlibrary modules. This is because they are passed through from outside of the sandbox. However, every\nnon-standard-library import that is performed at the top of the same file the workflow is in will add CPU overhead (the\nmodule is re-imported every workflow run) and memory overhead (each module independently cached as part of the workflow\nrun for isolation reasons). This becomes more apparent for large numbers of workflow runs.\n\nTo mitigate this, users should:\n\n* Define workflows in files that have as few non-standard-library imports as possible\n* Alter the max workflow cache and/or max concurrent workflows settings if memory grows too large\n* Set third-party libraries as passthrough modules if they are known to be side-effect free\n\n###### Extending Restricted Classes\n\nExtending a restricted class causes Python to instantiate the restricted metaclass which is unsupported. Therefore if\nyou attempt to use a class in the sandbox that extends a restricted class, it will fail. For example, if you have a\n`class MyZipFile(zipfile.ZipFile)` and try to use that class inside a workflow, it will fail.\n\nClasses used inside the workflow should not extend restricted classes. For situations where third-party modules need to\nat import time, they should be marked as pass through modules.\n\n###### Certain Standard Library Calls on Restricted Objects\n\nIf an object is restricted, internal C Python validation may fail in some cases. For example, running\n`dict.items(os.__dict__)` will fail with:\n\n> descriptor 'items' for 'dict' objects doesn't apply to a '_RestrictedProxy' object\n\nThis is a low-level check that cannot be subverted. The solution is to not use restricted objects inside the sandbox.\nFor situations where third-party modules need to at import time, they should be marked as pass through modules.\n\n###### is_subclass of ABC-based Restricted Classes\n\nDue to [https://bugs.python.org/issue44847](https://bugs.python.org/issue44847), classes that are wrapped and then\nchecked to see if they are subclasses of another via `is_subclass` may fail (see also\n[this wrapt issue](https://github.com/GrahamDumpleton/wrapt/issues/130)).\n\n\n### Activities\n\n#### Definition\n\nActivities are decorated with `@activity.defn` like so:\n\n```python\nfrom temporalio import activity\n\n@activity.defn\ndef say_hello_activity(name: str) -> str:\n return f\"Hello, {name}!\"\n```\n\nSome things to note about activity definitions:\n\n* The `say_hello_activity` is synchronous which is the recommended activity type (see \"Types of Activities\" below), but\n it can be `async`\n* A custom name for the activity can be set with a decorator argument, e.g. `@activity.defn(name=\"my activity\")`\n* Long running activities should regularly heartbeat and handle cancellation\n* Activities can only have positional arguments. Best practice is to only take a single argument that is an\n object/dataclass of fields that can be added to as needed.\n* Activities can be defined on methods instead of top-level functions. This allows the instance to carry state that an\n activity may need (e.g. a DB connection). The instance method should be what is registered with the worker.\n* Activities can also be defined on callable classes (i.e. classes with `__call__`). An instance of the class should be\n what is registered with the worker.\n* The `@activity.defn` can have `dynamic=True` set which means all otherwise unhandled activities fall through to this.\n If present, cannot have `name` argument, and the activity function must accept a single parameter of\n `Sequence[temporalio.common.RawValue]`. The payload of the raw value can be converted via\n `activity.payload_converter().from_payload`.\n\n#### Types of Activities\n\nThere are 3 types of activity callables accepted and described below: synchronous multithreaded, synchronous\nmultiprocess/other, and asynchronous. Only positional parameters are allowed in activity callables.\n\n##### Synchronous Activities\n\nSynchronous activities, i.e. functions that do not have `async def`, can be used with workers, but the\n`activity_executor` worker parameter must be set with a `concurrent.futures.Executor` instance to use for executing the\nactivities.\n\nAll long running, non-local activities should heartbeat so they can be cancelled. Cancellation in threaded activities\nthrows but multiprocess/other activities does not. The sections below on each synchronous type explain further. There\nare also calls on the context that can check for cancellation. For more information, see \"Activity Context\" and\n\"Heartbeating and Cancellation\" sections later.\n\nNote, all calls from an activity to functions in the `temporalio.activity` package are powered by\n[contextvars](https://docs.python.org/3/library/contextvars.html). Therefore, new threads starting _inside_ of\nactivities must `copy_context()` and then `.run()` manually to ensure `temporalio.activity` calls like `heartbeat` still\nfunction in the new threads.\n\nIf any activity ever throws a `concurrent.futures.BrokenExecutor`, the failure is consisted unrecoverable and the worker\nwill fail and shutdown.\n\n###### Synchronous Multithreaded Activities\n\nIf `activity_executor` is set to an instance of `concurrent.futures.ThreadPoolExecutor` then the synchronous activities\nare considered multithreaded activities. If `max_workers` is not set to at least the worker's\n`max_concurrent_activities` setting a warning will be issued. Besides `activity_executor`, no other worker parameters\nare required for synchronous multithreaded activities.\n\nBy default, cancellation of a synchronous multithreaded activity is done via a `temporalio.exceptions.CancelledError`\nthrown into the activity thread. Activities that do not wish to have cancellation thrown can set\n`no_thread_cancel_exception=True` in the `@activity.defn` decorator.\n\nCode that wishes to be temporarily shielded from the cancellation exception can run inside\n`with activity.shield_thread_cancel_exception():`. But once the last nested form of that block is finished, even if\nthere is a return statement within, it will throw the cancellation if there was one. A `try` +\n`except temporalio.exceptions.CancelledError` would have to surround the `with` to handle the cancellation explicitly.\n\n###### Synchronous Multiprocess/Other Activities\n\nIf `activity_executor` is set to an instance of `concurrent.futures.Executor` that is _not_\n`concurrent.futures.ThreadPoolExecutor`, then the synchronous activities are considered multiprocess/other activities.\nUsers should prefer threaded activities over multiprocess ones since, among other reasons, threaded activities can raise\non cancellation.\n\nThese require special primitives for heartbeating and cancellation. The `shared_state_manager` worker parameter must be\nset to an instance of `temporalio.worker.SharedStateManager`. The most common implementation can be created by passing a\n`multiprocessing.managers.SyncManager` (i.e. result of `multiprocessing.managers.Manager()`) to\n`temporalio.worker.SharedStateManager.create_from_multiprocessing()`.\n\nAlso, all of these activity functions must be\n[\"picklable\"](https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled).\n\n##### Asynchronous Activities\n\nAsynchronous activities are functions defined with `async def`. Asynchronous activities are often much more performant\nthan synchronous ones. When using asynchronous activities no special worker parameters are needed.\n\n**\u26a0\ufe0f WARNING: Do not block the thread in `async def` Python functions. This can stop the processing of the rest of the\nTemporal.**\n\nCancellation for asynchronous activities is done via\n[`asyncio.Task.cancel`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel). This means that\n`asyncio.CancelledError` will be raised (and can be caught, but it is not recommended). A non-local activity must\nheartbeat to receive cancellation and there are other ways to be notified about cancellation (see \"Activity Context\" and\n\"Heartbeating and Cancellation\" later).\n\n#### Activity Context\n\nDuring activity execution, an implicit activity context is set as a\n[context variable](https://docs.python.org/3/library/contextvars.html). The context variable itself is not visible, but\ncalls in the `temporalio.activity` package make use of it. Specifically:\n\n* `in_activity()` - Whether an activity context is present\n* `info()` - Returns the immutable info of the currently running activity\n* `client()` - Returns the Temporal client used by this worker. Only available in `async def` activities.\n* `heartbeat(*details)` - Record a heartbeat\n* `is_cancelled()` - Whether a cancellation has been requested on this activity\n* `wait_for_cancelled()` - `async` call to wait for cancellation request\n* `wait_for_cancelled_sync(timeout)` - Synchronous blocking call to wait for cancellation request\n* `shield_thread_cancel_exception()` - Context manager for use in `with` clauses by synchronous multithreaded activities\n to prevent cancel exception from being thrown during the block of code\n* `is_worker_shutdown()` - Whether the worker has started graceful shutdown\n* `wait_for_worker_shutdown()` - `async` call to wait for start of graceful worker shutdown\n* `wait_for_worker_shutdown_sync(timeout)` - Synchronous blocking call to wait for start of graceful worker shutdown\n* `raise_complete_async()` - Raise an error that this activity will be completed asynchronously (i.e. after return of\n the activity function in a separate client call)\n\nWith the exception of `in_activity()`, if any of the functions are called outside of an activity context, an error\noccurs. Synchronous activities cannot call any of the `async` functions.\n\n##### Heartbeating and Cancellation\n\nIn order for a non-local activity to be notified of cancellation requests, it must be given a `heartbeat_timeout` at\ninvocation time and invoke `temporalio.activity.heartbeat()` inside the activity. It is strongly recommended that all\nbut the fastest executing activities call this function regularly. \"Types of Activities\" has specifics on cancellation\nfor synchronous and asynchronous activities.\n\nIn addition to obtaining cancellation information, heartbeats also support detail data that is persisted on the server\nfor retrieval during activity retry. If an activity calls `temporalio.activity.heartbeat(123, 456)` and then fails and\nis retried, `temporalio.activity.info().heartbeat_details` will return an iterable containing `123` and `456` on the\nnext run.\n\nHeartbeating has no effect on local activities.\n\n##### Worker Shutdown\n\nAn activity can react to a worker shutdown. Using `is_worker_shutdown` or one of the `wait_for_worker_shutdown`\nfunctions an activity can react to a shutdown.\n\nWhen the `graceful_shutdown_timeout` worker parameter is given a `datetime.timedelta`, on shutdown the worker will\nnotify activities of the graceful shutdown. Once that timeout has passed (or if wasn't set), the worker will perform\ncancellation of all outstanding activities.\n\nThe `shutdown()` invocation will wait on all activities to complete, so if a long-running activity does not at least\nrespect cancellation, the shutdown may never complete.\n\n#### Testing\n\nUnit testing an activity or any code that could run in an activity is done via the\n`temporalio.testing.ActivityEnvironment` class. Simply instantiate this and any callable + params passed to `run` will\nbe invoked inside the activity context. The following are attributes/methods on the environment that can be used to\naffect calls activity code might make to functions on the `temporalio.activity` package.\n\n* `info` property can be set to customize what is returned from `activity.info()`\n* `on_heartbeat` property can be set to handle `activity.heartbeat()` calls\n* `cancel()` can be invoked to simulate a cancellation of the activity\n* `worker_shutdown()` can be invoked to simulate a worker shutdown during execution of the activity\n\n\n### Interceptors\n\nThe behavior of the SDK can be customized in many useful ways by modifying inbound and outbound calls using\ninterceptors. This is similar to the use of middleware in other frameworks.\n\nThere are five categories of inbound and outbound calls that you can modify in this way:\n\n1. Outbound client calls, such as `start_workflow()`, `signal_workflow()`, `list_workflows()`, `update_schedule()`, etc.\n\n2. Inbound workflow calls: `execute_workflow()`, `handle_signal()`, `handle_update_handler()`, etc\n\n3. Outbound workflow calls: `start_activity()`, `start_child_workflow()`, `start_nexus_operation()`, etc\n\n4. Inbound call to execute an activity: `execute_activity()`\n\n5. Outbound activity calls: `info()` and `heartbeat()`\n\n\nTo modify outbound client calls, define a class inheriting from\n[`client.Interceptor`](https://python.temporal.io/temporalio.client.Interceptor.html), and implement the method\n`intercept_client()` to return an instance of\n[`OutboundInterceptor`](https://python.temporal.io/temporalio.client.OutboundInterceptor.html) that implements the\nsubset of outbound client calls that you wish to modify.\n\nThen, pass a list containing an instance of your `client.Interceptor` class as the\n`interceptors` argument of [`Client.connect()`](https://python.temporal.io/temporalio.client.Client.html#connect).\n\nThe purpose of the interceptor framework is that the methods you implement on your interceptor classes can perform\narbitrary side effects and/or arbitrary modifications to the data, before it is received by the SDK's \"real\"\nimplementation. The `interceptors` list can contain multiple interceptors. In this case they form a chain: a method\nimplemented on an interceptor instance in the list can perform side effects, and modify the data, before passing it on\nto the corresponding method on the next interceptor in the list. Your interceptor classes need not implement every\nmethod; the default implementation is always to pass the data on to the next method in the interceptor chain.\n\nThe remaining four categories are worker calls. To modify these, define a class inheriting from\n[`worker.Interceptor`](https://python.temporal.io/temporalio.worker.Interceptor.html) and implement methods on that\nclass to define the\n[`ActivityInboundInterceptor`](https://python.temporal.io/temporalio.worker.ActivityInboundInterceptor.html),\n[`ActivityOutboundInterceptor`](https://python.temporal.io/temporalio.worker.ActivityOutboundInterceptor.html),\n[`WorkflowInboundInterceptor`](https://python.temporal.io/temporalio.worker.WorkflowInboundInterceptor.html), and\n[`WorkflowOutboundInterceptor`](https://python.temporal.io/temporalio.worker.WorkflowOutboundInterceptor.html) classes\nthat you wish to use to effect your modifications. Then, pass a list containing an instance of your `worker.Interceptor`\nclass as the `interceptors` argument of the [`Worker()`](https://python.temporal.io/temporalio.worker.Worker.html)\nconstructor.\n\nIt often happens that your worker and client interceptors will share code because they implement closely related logic.\nFor convenience, you can create an interceptor class that inherits from _both_ `client.Interceptor` and\n`worker.Interceptor` (their method sets do not overlap). You can then pass this in the `interceptors` argument of\n`Client.connect()` when starting your worker _as well as_ in your client/starter code. If you do this, your worker will\nautomatically pick up the interceptors from its underlying client (and you should not pass them directly to the\n`Worker()` constructor).\n\nThis is best explained by example. The [Context Propagation Interceptor\nSample](https://github.com/temporalio/samples-python/tree/main/context_propagation) is a good starting point. In\n[context_propagation/interceptor.py](https://github.com/temporalio/samples-python/blob/main/context_propagation/interceptor.py)\na class is defined that inherits from both `client.Interceptor` and `worker.Interceptor`. It implements the various\nmethods such that the outbound client and workflow calls set a certain key in the outbound `headers` field, and the\ninbound workflow and activity calls retrieve the header value from the inbound workflow/activity input data. An instance\nof this interceptor class is passed to `Client.connect()` when [starting the\nworker](https://github.com/temporalio/samples-python/blob/main/context_propagation/worker.py) and when connecting the\nclient in the [workflow starter\ncode](https://github.com/temporalio/samples-python/blob/main/context_propagation/starter.py).\n\n\n### Nexus\n\n\u26a0\ufe0f **Nexus support is currently at an experimental release stage. Backwards-incompatible changes are anticipated until a stable release is announced.** \u26a0\ufe0f\n\n[Nexus](https://github.com/nexus-rpc/) is a synchronous RPC protocol. Arbitrary duration operations that can respond\nasynchronously are modeled on top of a set of pre-defined synchronous RPCs.\n\nTemporal supports calling Nexus operations **from a workflow**. See https://docs.temporal.io/nexus. There is no support\ncurrently for calling a Nexus operation from non-workflow code.\n\nTo get started quickly using Nexus with Temporal, see the Python Nexus sample:\nhttps://github.com/temporalio/samples-python/tree/nexus/hello_nexus.\n\n\nTwo types of Nexus operation are supported, each using a decorator:\n\n- `@temporalio.nexus.workflow_run_operation`: a Nexus operation that is backed by a Temporal workflow. The operation\n handler you write will start the handler workflow and then respond with a token indicating that the handler workflow\n is in progress. When the handler workflow completes, Temporal server will automatically deliver the result (success or\n failure) to the caller workflow.\n- `@nexusrpc.handler.sync_operation`: an operation that responds synchronously. It may be `def` or `async def` and it\nmay do network I/O, but it must respond within 10 seconds.\n\nThe following steps are an overview of the [Python Nexus sample](\nhttps://github.com/temporalio/samples-python/tree/nexus/hello_nexus).\n\n1. Create the caller and handler namespaces, and the Nexus endpoint. For example,\n ```\n temporal operator namespace create --namespace my-handler-namespace\n temporal operator namespace create --namespace my-caller-namespace\n\n temporal operator nexus endpoint create \\\n --name my-nexus-endpoint \\\n --target-namespace my-handler-namespace \\\n --target-task-queue my-handler-task-queue\n ```\n\n2. Define your service contract. This specifies the names and input/output types of your operations. You will use this\n to refer to the operations when calling them from a workflow.\n ```python\n @nexusrpc.service\n class MyNexusService:\n my_sync_operation: nexusrpc.Operation[MyInput, MyOutput]\n my_workflow_run_operation: nexusrpc.Operation[MyInput, MyOutput]\n ```\n\n3. Implement your operation handlers in a service handler:\n ```python\n @service_handler(service=MyNexusService)\n class MyNexusServiceHandler:\n @sync_operation\n async def my_sync_operation(\n self, ctx: StartOperationContext, input: MyInput\n ) -> MyOutput:\n return MyOutput(message=f\"Hello {input.name} from sync operation!\")\n\n @workflow_run_operation\n async def my_workflow_run_operation(\n self, ctx: WorkflowRunOperationContext, input: MyInput\n ) -> nexus.WorkflowHandle[MyOutput]:\n return await ctx.start_workflow(\n WorkflowStartedByNexusOperation.run,\n input,\n id=str(uuid.uuid4()),\n )\n ```\n\n4. Register your service handler with a Temporal worker.\n ```python\n client = await Client.connect(\"localhost:7233\", namespace=\"my-handler-namespace\")\n worker = Worker(\n client,\n task_queue=\"my-handler-task-queue\",\n workflows=[WorkflowStartedByNexusOperation],\n nexus_service_handlers=[MyNexusServiceHandler()],\n )\n await worker.run()\n ```\n\n5. Call your Nexus operations from your caller workflow.\n ```python\n @workflow.defn\n class CallerWorkflow:\n def __init__(self):\n self.nexus_client = workflow.create_nexus_client(\n service=MyNexusService, endpoint=\"my-nexus-endpoint\"\n )\n\n @workflow.run\n async def run(self, name: str) -> tuple[MyOutput, MyOutput]:\n # Start the Nexus operation and wait for the result in one go, using execute_operation.\n wf_result = await self.nexus_client.execute_operation(\n MyNexusService.my_workflow_run_operation,\n MyInput(name),\n )\n # Or alternatively, obtain the operation handle using start_operation,\n # and then use it to get the result:\n sync_operation_handle = await self.nexus_client.start_operation(\n MyNexusService.my_sync_operation,\n MyInput(name),\n )\n sync_result = await sync_operation_handle\n return sync_result, wf_result\n ```\n\n\n### Plugins\n\nPlugins provide a way to extend and customize the behavior of Temporal clients and workers through a chain of \nresponsibility pattern. They allow you to intercept and modify client creation, service connections, worker \nconfiguration, and worker execution. Common customizations may include but are not limited to:\n\n1. DataConverter\n2. Activities\n3. Workflows\n4. Interceptors\n\nA single plugin class can implement both client and worker plugin interfaces to share common logic between both \ncontexts. When used with a client, it will automatically be propagated to any workers created with that client.\n\n#### Client Plugins\n\nClient plugins can intercept and modify client configuration and service connections. They are useful for adding \nauthentication, modifying connection parameters, or adding custom behavior during client creation.\n\nHere's an example of a client plugin that adds custom authentication:\n\n```python\nfrom temporalio.client import Plugin, ClientConfig\nimport temporalio.service\n\nclass AuthenticationPlugin(Plugin):\n def __init__(self, api_key: str):\n self.api_key = api_key\n\n def configure_client(self, config: ClientConfig) -> ClientConfig:\n # Modify client configuration\n config[\"namespace\"] = \"my-secure-namespace\"\n return super().configure_client(config)\n\n async def connect_service_client(\n self, config: temporalio.service.ConnectConfig\n ) -> temporalio.service.ServiceClient:\n # Add authentication to the connection\n config.api_key = self.api_key\n return await super().connect_service_client(config)\n\n# Use the plugin when connecting\nclient = await Client.connect(\n \"my-server.com:7233\",\n plugins=[AuthenticationPlugin(\"my-api-key\")]\n)\n```\n\n#### Worker Plugins\n\nWorker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring, \ncustom lifecycle management, or modifying worker settings.\n\nHere's an example of a worker plugin that adds custom monitoring:\n\n```python\nfrom temporalio.worker import Plugin, WorkerConfig, Worker\nimport logging\n\nclass MonitoringPlugin(Plugin):\n def __init__(self):\n self.logger = logging.getLogger(__name__)\n\n def configure_worker(self, config: WorkerConfig) -> WorkerConfig:\n # Modify worker configuration\n original_task_queue = config[\"task_queue\"]\n config[\"task_queue\"] = f\"monitored-{original_task_queue}\"\n self.logger.info(f\"Worker created for task queue: {config['task_queue']}\")\n return super().configure_worker(config)\n\n async def run_worker(self, worker: Worker) -> None:\n self.logger.info(\"Starting worker execution\")\n try:\n await super().run_worker(worker)\n finally:\n self.logger.info(\"Worker execution completed\")\n\n# Use the plugin when creating a worker\nworker = Worker(\n client,\n task_queue=\"my-task-queue\",\n workflows=[MyWorkflow],\n activities=[my_activity],\n plugins=[MonitoringPlugin()]\n)\n```\n\nFor plugins that need to work with both clients and workers, you can implement both interfaces in a single class:\n\n```python\nfrom temporalio.client import Plugin as ClientPlugin, ClientConfig\nfrom temporalio.worker import Plugin as WorkerPlugin, WorkerConfig\n\n\nclass UnifiedPlugin(ClientPlugin, WorkerPlugin):\n def configure_client(self, config: ClientConfig) -> ClientConfig:\n # Client-side customization\n config[\"namespace\"] = \"unified-namespace\"\n return super().configure_client(config)\n\n def configure_worker(self, config: WorkerConfig) -> WorkerConfig:\n # Worker-side customization\n config[\"max_cached_workflows\"] = 500\n return super().configure_worker(config)\n\n async def run_worker(self, worker: Worker) -> None:\n print(\"Starting unified worker\")\n await super().run_worker(worker)\n\n\n# Create client with the unified plugin\nclient = await Client.connect(\n \"localhost:7233\",\n plugins=[UnifiedPlugin()]\n)\n\n# Worker will automatically inherit the plugin from the client\nworker = Worker(\n client,\n task_queue=\"my-task-queue\",\n workflows=[MyWorkflow],\n activities=[my_activity]\n)\n```\n\n**Important Notes:**\n\n- Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility\n- Client plugins that also implement worker plugin interfaces are automatically propagated to workers\n- Avoid providing the same plugin to both client and worker to prevent double execution\n- Plugin methods should call `super()` to maintain the plugin chain\n- Each plugin's `name()` method returns a unique identifier for debugging purposes\n\n\n### Workflow Replay\n\nGiven a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,\nassuming `history_str` is populated with a JSON string history either exported from the web UI or from `tctl`, the\nfollowing function will replay it:\n\n```python\nfrom temporalio.client import WorkflowHistory\nfrom temporalio.worker import Replayer\n\nasync def run_replayer(history_str: str):\n replayer = Replayer(workflows=[SayHello])\n await replayer.replay_workflow(WorkflowHistory.from_json(history_str))\n```\n\nThis will throw an error if any non-determinism is detected.\n\nReplaying from workflow history is a powerful concept that many use to test that workflow alterations won't cause\nnon-determinisms with past-complete workflows. The following code will make sure that all workflow histories for a\ncertain workflow type (i.e. workflow class) are safe with the current code.\n\n```python\nfrom temporalio.client import Client, WorkflowHistory\nfrom temporalio.worker import Replayer\n\nasync def check_past_histories(my_client: Client):\n replayer = Replayer(workflows=[SayHello])\n await replayer.replay_workflows(\n await my_client.list_workflows(\"WorkflowType = 'SayHello'\").map_histories(),\n )\n```\n\n### Observability\n\nSee https://github.com/temporalio/samples-python/tree/main/open_telemetry for a sample demonstrating collection of\nmetrics and tracing data emitted by the SDK.\n\n#### Metrics\n\nThe SDK emits various metrics by default: see https://docs.temporal.io/references/sdk-metrics. To configure additional\nattributes to be emitted with all metrics, pass\n[global_tags](https://python.temporal.io/temporalio.runtime.TelemetryConfig.html#global_tags) when creating the\n[TelemetryConfig](https://python.temporal.io/temporalio.runtime.TelemetryConfig.html).\n\nFor emitting custom metrics, the SDK makes a metric meter available:\n- In Workflow code, use https://python.temporal.io/temporalio.workflow.html#metric_meter\n- In Activity code, use https://python.temporal.io/temporalio.activity.html#metric_meter\n- In normal application code, use https://python.temporal.io/temporalio.runtime.Runtime.html#metric_meter\n\nThe attributes emitted by these default to `namespace`, `task_queue`, and `workflow_type`/`activity_type`; use\n`with_additional_attributes` to create a meter emitting additional attributes.\n\n#### OpenTelemetry Tracing\n\nTracing support requires the optional `opentelemetry` dependencies which are part of the `opentelemetry` extra. When\nusing `pip`, running\n\n pip install 'temporalio[opentelemetry]'\n\nwill install needed dependencies. Then the `temporalio.contrib.opentelemetry.TracingInterceptor` can be created and set\nas an interceptor on the `interceptors` argument of `Client.connect`. When set, spans will be created for all client\ncalls and for all activity and workflow invocations on the worker, spans will be created and properly serialized through\nthe server to give one proper trace for a workflow execution.\n\n### Protobuf 3.x vs 4.x\n\nPython currently has two somewhat-incompatible protobuf library versions - the 3.x series and the 4.x series. Python\ncurrently recommends 4.x and that is the primary supported version. Some libraries like\n[Pulumi](https://github.com/pulumi/pulumi) require 4.x. Other libraries such as [ONNX](https://github.com/onnx/onnx) and\n[Streamlit](https://github.com/streamlit/streamlit), for one reason or another, have/will not leave 3.x.\n\nTo support these, Temporal Python SDK allows any protobuf library >= 3.19. However, the C extension in older Python\nversions can cause issues with the sandbox due to global state sharing. Temporal strongly recommends using the latest\nprotobuf 4.x library unless you absolutely cannot at which point some proto libraries may have to be marked as\n[Passthrough Modules](#passthrough-modules).\n\n### Known Compatibility Issues\n\nBelow are known compatibility issues with the Python SDK.\n\n#### gevent Patching\n\nWhen using `gevent.monkey.patch_all()`, asyncio event loops can get messed up, especially those using custom event loops\nlike Temporal. See [this gevent issue](https://github.com/gevent/gevent/issues/982). This is a known incompatibility and\nusers are encouraged to not use gevent in asyncio applications (including Temporal). But if you must, there is\n[a sample](https://github.com/temporalio/samples-python/tree/main/gevent_async) showing how it is possible.\n\n# Development\n\nThe Python SDK is built to work with Python 3.9 and newer. It is built using\n[SDK Core](https://github.com/temporalio/sdk-core/) which is written in Rust.\n\n### Building\n\n#### Prepare\n\nTo build the SDK from source for use as a dependency, the following prerequisites are required:\n\n* [uv](https://docs.astral.sh/uv/)\n* [Rust](https://www.rust-lang.org/)\n* [Protobuf Compiler](https://protobuf.dev/)\n\nUse `uv` to install `poe`:\n\n```bash\nuv tool install poethepoet\n```\n\nNow clone the SDK repository recursively:\n\n```bash\ngit clone --recursive https://github.com/temporalio/sdk-python.git\ncd sdk-python\n```\n\nInstall the dependencies:\n\n```bash\nuv sync --all-extras\n```\n\n#### Build\n\nNow perform the release build:\n\n> This will take a while because Rust will compile the core project in release mode (see [Local SDK development\nenvironment](#local-sdk-development-environment) for the quicker approach to local development).\n\n```bash\nuv build\n```\n\n\nThe `.whl` wheel file in `dist/` is now ready to use.\n\n#### Use\n\nThe wheel can now be installed into any virtual environment.\n\nFor example,\n[create a virtual environment](https://packaging.python.org/en/latest/tutorials/installing-packages/#creating-virtual-environments)\nsomewhere and then run the following inside the virtual environment:\n\n```bash\npip install wheel\n```\n\n```bash\npip install /path/to/cloned/sdk-python/dist/*.whl\n```\n\nCreate this Python file at `example.py`:\n\n```python\nimport asyncio\nfrom temporalio import workflow, activity\nfrom temporalio.client import Client\nfrom temporalio.worker import Worker\n\n@workflow.defn\nclass SayHello:\n @workflow.run\n async def run(self, name: str) -> str:\n return f\"Hello, {name}!\"\n\nasync def main():\n client = await Client.connect(\"localhost:7233\")\n async with Worker(client, task_queue=\"my-task-queue\", workflows=[SayHello]):\n result = await client.execute_workflow(SayHello.run, \"Temporal\",\n id=\"my-workflow-id\", task_queue=\"my-task-queue\")\n print(f\"Result: {result}\")\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\nAssuming there is a [local Temporal server](https://docs.temporal.io/docs/server/quick-install/) running, execute the\nfile with `python` (or `python3` if necessary):\n\n```bash\npython example.py\n```\n\nIt should output:\n\n Result: Hello, Temporal!\n\n### Local SDK development environment\n\nFor local development, it is quicker to use a debug build.\n\nPerform the same steps as the \"Prepare\" section above by installing the prerequisites, cloning the project, and\ninstalling dependencies:\n\n```bash\ngit clone --recursive https://github.com/temporalio/sdk-python.git\ncd sdk-python\nuv sync --all-extras\n```\n\nNow compile the Rust extension in develop mode which is quicker than release mode:\n\n```bash\npoe build-develop\n```\n\nThat step can be repeated for any Rust changes made.\n\nThe environment is now ready to develop in.\n\n#### Testing\n\nTo execute tests:\n\n```bash\npoe test\n```\n\nThis runs against [Temporalite](https://github.com/temporalio/temporalite). To run against the time-skipping test\nserver, pass `--workflow-environment time-skipping`. To run against the `default` namespace of an already-running\nserver, pass the `host:port` to `--workflow-environment`. Can also use regular pytest arguments. For example, here's how\nto run a single test with debug logs on the console:\n\n```bash\npoe test -s --log-cli-level=DEBUG -k test_sync_activity_thread_cancel_caught\n```\n\n#### Proto Generation and Testing\n\nTo allow for backwards compatibility, protobuf code is generated on the 3.x series of the protobuf library. To generate\nprotobuf code, you must be on Python <= 3.10, and then run `uv add \"protobuf<4\"` + `uv sync --all-extras`. Then the\nprotobuf files can be generated via `poe gen-protos`. Tests can be run for protobuf version 3 by setting the\n`TEMPORAL_TEST_PROTO3` env var to `1` prior to running tests.\n\nDo not commit `uv.lock` or `pyproject.toml` changes. To go back from this downgrade, restore both of those files and run\n`uv sync --all-extras`. Make sure you `poe format` the results.\n\nFor a less system-intrusive approach, you can:\n```shell\ndocker build -f scripts/_proto/Dockerfile .\ndocker run --rm -v \"${PWD}/temporalio/api:/api_new\" -v \"${PWD}/temporalio/bridge/proto:/bridge_new\" <just built image sha>\npoe format\n```\n\n### Style\n\n* Mostly [Google Style Guide](https://google.github.io/styleguide/pyguide.html). Notable exceptions:\n * We use [ruff](https://docs.astral.sh/ruff/) for formatting, so that takes precedence\n * In tests and example code, can import individual classes/functions to make it more readable. Can also do this for\n rarely in library code for some Python common items (e.g. `dataclass` or `partial`), but not allowed to do this for\n any `temporalio` packages (except `temporalio.types`) or any classes/functions that aren't clear when unqualified.\n * We allow relative imports for private packages\n * We allow `@staticmethod`\n\n",
"bugtrack_url": null,
"license": null,
"summary": "Temporal.io Python SDK",
"version": "1.15.0",
"project_urls": {
"Bug Tracker": "https://github.com/temporalio/sdk-python/issues",
"Documentation": "https://docs.temporal.io/docs/python",
"Homepage": "https://github.com/temporalio/sdk-python",
"Repository": "https://github.com/temporalio/sdk-python"
},
"split_keywords": [
"temporal",
" workflow"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "0e2d0153f2bc459e0cb59d41d4dd71da46bf9a98ca98bc37237576c258d6696b",
"md5": "b74ef3027ecb01c9649dc945860914b3",
"sha256": "74bc5cc0e6bdc161a43015538b0821b8713f5faa716c4209971c274b528e0d47"
},
"downloads": -1,
"filename": "temporalio-1.15.0-cp39-abi3-macosx_10_12_x86_64.whl",
"has_sig": false,
"md5_digest": "b74ef3027ecb01c9649dc945860914b3",
"packagetype": "bdist_wheel",
"python_version": "cp39",
"requires_python": ">=3.9",
"size": 12703607,
"upload_time": "2025-07-29T03:43:30",
"upload_time_iso_8601": "2025-07-29T03:43:30.083054Z",
"url": "https://files.pythonhosted.org/packages/0e/2d/0153f2bc459e0cb59d41d4dd71da46bf9a98ca98bc37237576c258d6696b/temporalio-1.15.0-cp39-abi3-macosx_10_12_x86_64.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "e4391b867ec698c8987aef3b7a7024b5c0c732841112fa88d021303d0fc69bea",
"md5": "d47cbc81503aebfd5ad116aceef68e8b",
"sha256": "ee8001304dae5723d79797516cfeebe04b966fdbdf348e658fce3b43afdda3cd"
},
"downloads": -1,
"filename": "temporalio-1.15.0-cp39-abi3-macosx_11_0_arm64.whl",
"has_sig": false,
"md5_digest": "d47cbc81503aebfd5ad116aceef68e8b",
"packagetype": "bdist_wheel",
"python_version": "cp39",
"requires_python": ">=3.9",
"size": 12232853,
"upload_time": "2025-07-29T03:43:38",
"upload_time_iso_8601": "2025-07-29T03:43:38.909071Z",
"url": "https://files.pythonhosted.org/packages/e4/39/1b867ec698c8987aef3b7a7024b5c0c732841112fa88d021303d0fc69bea/temporalio-1.15.0-cp39-abi3-macosx_11_0_arm64.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "5e3e647d9a7c8b2f638f639717404c0bcbdd7d54fddd7844fdb802e3f40dc55f",
"md5": "6780e15713e59fd19b45da5fd28d6ffe",
"sha256": "8febd1ac36720817e69c2176aa4aca14a97fe0b83f0d2449c0c730b8f0174d02"
},
"downloads": -1,
"filename": "temporalio-1.15.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
"has_sig": false,
"md5_digest": "6780e15713e59fd19b45da5fd28d6ffe",
"packagetype": "bdist_wheel",
"python_version": "cp39",
"requires_python": ">=3.9",
"size": 12636700,
"upload_time": "2025-07-29T03:43:49",
"upload_time_iso_8601": "2025-07-29T03:43:49.066261Z",
"url": "https://files.pythonhosted.org/packages/5e/3e/647d9a7c8b2f638f639717404c0bcbdd7d54fddd7844fdb802e3f40dc55f/temporalio-1.15.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "9a137aa9ec694fec9fba39efdbf61d892bccf7d2b1aa3d9bd359544534c1d309",
"md5": "e576821987192fac9e1f7af85878a16d",
"sha256": "202d81a42cafaed9ccc7ccbea0898838e3b8bf92fee65394f8790f37eafbaa63"
},
"downloads": -1,
"filename": "temporalio-1.15.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"has_sig": false,
"md5_digest": "e576821987192fac9e1f7af85878a16d",
"packagetype": "bdist_wheel",
"python_version": "cp39",
"requires_python": ">=3.9",
"size": 12860186,
"upload_time": "2025-07-29T03:43:57",
"upload_time_iso_8601": "2025-07-29T03:43:57.644644Z",
"url": "https://files.pythonhosted.org/packages/9a/13/7aa9ec694fec9fba39efdbf61d892bccf7d2b1aa3d9bd359544534c1d309/temporalio-1.15.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "9f2bba962401324892236148046dbffd805d4443d6df7a7dc33cc7964b566bf9",
"md5": "a16daaea4af4241da7601795bba35046",
"sha256": "aae5b18d7c9960238af0f3ebf6b7e5959e05f452106fc0d21a8278d78724f780"
},
"downloads": -1,
"filename": "temporalio-1.15.0-cp39-abi3-win_amd64.whl",
"has_sig": false,
"md5_digest": "a16daaea4af4241da7601795bba35046",
"packagetype": "bdist_wheel",
"python_version": "cp39",
"requires_python": ">=3.9",
"size": 12932800,
"upload_time": "2025-07-29T03:44:06",
"upload_time_iso_8601": "2025-07-29T03:44:06.271783Z",
"url": "https://files.pythonhosted.org/packages/9f/2b/ba962401324892236148046dbffd805d4443d6df7a7dc33cc7964b566bf9/temporalio-1.15.0-cp39-abi3-win_amd64.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "0baf1a3619fc62333d0acbdf90cfc5ada97e68e8c0f79610363b2dbb30871d83",
"md5": "b4bf8128418bdc09afff5e07ff70168c",
"sha256": "a4bc6ca01717880112caab75d041713aacc8263dc66e41f5019caef68b344fa0"
},
"downloads": -1,
"filename": "temporalio-1.15.0.tar.gz",
"has_sig": false,
"md5_digest": "b4bf8128418bdc09afff5e07ff70168c",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 1684485,
"upload_time": "2025-07-29T03:44:09",
"upload_time_iso_8601": "2025-07-29T03:44:09.071870Z",
"url": "https://files.pythonhosted.org/packages/0b/af/1a3619fc62333d0acbdf90cfc5ada97e68e8c0f79610363b2dbb30871d83/temporalio-1.15.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-29 03:44:09",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "temporalio",
"github_project": "sdk-python",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "temporalio"
}