# temporal-activity-cache
Prefect-style activity caching for Temporal workflows using Redis.
> **⚠️ Early Release Notice**
> This is an early-stage release. While functional, the API may change and there may be bugs. Use with caution in production environments. This software is provided "as is" without warranty of any kind. See the LICENSE file for details.
## Overview
`temporal-activity-cache` brings Prefect-style caching to Temporal activities. It enables distributed caching across workers by storing activity results in Redis, allowing results to be reused across different workflow executions and worker instances.
### Key Features
- 🚀 **Cross-workflow caching** - Reuse activity results across different workflow executions
- 🔄 **Distributed workers** - Cache shared via Redis across multiple worker instances
- ⚡ **Multiple cache policies** - Cache by inputs, source code, or disable caching
- ⏱️ **Configurable TTL** - Set expiration times for cached results
- 🛡️ **Graceful degradation** - Activities still work if Redis is unavailable
- 🎯 **Type-safe** - Full type hints and mypy support
## Installation
```bash
pip install temporal-activity-cache
```
Or with uv:
```bash
uv add temporal-activity-cache
```
## Quick Start
### 1. Set up cache backend (once at startup)
```python
from temporal_activity_cache import set_cache_backend, RedisCacheBackend
# Configure Redis backend
backend = RedisCacheBackend(host="localhost", port=6379)
set_cache_backend(backend)
```
### 2. Add caching to activities
```python
from datetime import timedelta
from temporalio import activity
from temporal_activity_cache import cached_activity, CachePolicy
@cached_activity(policy=CachePolicy.INPUTS, ttl=timedelta(hours=1))
@activity.defn(name="fetch_user")
async def fetch_user(user_id: int) -> dict:
"""This activity will be cached for 1 hour based on user_id."""
return await expensive_database_call(user_id)
```
### 3. Use in workflows (no changes needed!)
```python
from temporalio import workflow
from datetime import timedelta
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, user_id: int) -> dict:
# Activity results are automatically cached
user = await workflow.execute_activity(
fetch_user,
user_id,
start_to_close_timeout=timedelta(seconds=30)
)
return user
```
## How It Works
### Traditional Temporal Event History
Temporal's Event History provides replay capability **within a single workflow execution**, but doesn't cache across workflows:
```python
# Workflow execution 1
result1 = await client.execute_workflow(
MyWorkflow.run,
user_id=123,
id="workflow-1",
task_queue="my-queue"
)
# Activity executes → Result stored in workflow-1's Event History
# Workflow execution 2 (different workflow!)
result2 = await client.execute_workflow(
MyWorkflow.run,
user_id=123, # ← Same input!
id="workflow-2",
task_queue="my-queue"
)
# ❌ Activity executes AGAIN (separate Event History)
```
### With temporal-activity-cache
```python
# Workflow execution 1
result1 = await client.execute_workflow(
MyWorkflow.run,
user_id=123,
id="workflow-1",
task_queue="my-queue"
)
# Activity executes → Result cached in Redis
# Workflow execution 2
result2 = await client.execute_workflow(
MyWorkflow.run,
user_id=123, # ← Same input!
id="workflow-2",
task_queue="my-queue"
)
# ✅ Cache HIT! Activity skipped, result from Redis
```
## Cache Policies
### `CachePolicy.INPUTS` (Default)
Cache based on function inputs only:
```python
@cached_activity(policy=CachePolicy.INPUTS, ttl=timedelta(hours=1))
@activity.defn
async def fetch_data(user_id: int) -> dict:
return await db.query(user_id)
# Same user_id = cache hit
await fetch_data(123) # Cache MISS - executes
await fetch_data(123) # Cache HIT - returns cached result
await fetch_data(456) # Cache MISS - different input
```
### `CachePolicy.TASK_SOURCE`
Cache based on function source code AND inputs:
```python
@cached_activity(policy=CachePolicy.TASK_SOURCE, ttl=timedelta(hours=1))
@activity.defn
async def calculate(x: int) -> int:
return x * 2
# If you change the function code, cache is invalidated
```
### `CachePolicy.NO_CACHE`
Disable caching entirely:
```python
@cached_activity(policy=CachePolicy.NO_CACHE)
@activity.defn
async def send_email(to: str) -> None:
# Always executes, never cached
await email_service.send(to)
```
## Advanced Usage
### Custom Cache Backend
```python
from temporal_activity_cache import CacheBackend
class MyCustomBackend(CacheBackend):
async def get(self, key: str):
# Your implementation
pass
async def set(self, key: str, value: Any, ttl: timedelta = None):
# Your implementation
pass
# ... implement other methods
# Use custom backend
set_cache_backend(MyCustomBackend())
```
### Manual Cache Invalidation
```python
from temporal_activity_cache import invalidate_cache, CachePolicy
# Invalidate specific cached result
await invalidate_cache(
fetch_user,
CachePolicy.INPUTS,
user_id=123 # Same args used when caching
)
```
### Per-Activity Backend
```python
# Use different cache backend for specific activity
redis_backend = RedisCacheBackend(host="localhost", port=6379)
@cached_activity(
policy=CachePolicy.INPUTS,
ttl=timedelta(hours=1),
cache_backend=redis_backend # Override global backend
)
@activity.defn
async def special_activity(data: str) -> str:
return process(data)
```
## Complete Example
```python
import asyncio
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporal_activity_cache import (
cached_activity,
CachePolicy,
set_cache_backend,
RedisCacheBackend,
)
# 1. Configure cache backend
def setup_cache():
backend = RedisCacheBackend(host="localhost", port=6379)
set_cache_backend(backend)
# 2. Define cached activities
@cached_activity(policy=CachePolicy.INPUTS, ttl=timedelta(hours=1))
@activity.defn(name="fetch_user")
async def fetch_user(user_id: int) -> dict:
"""Expensive database call - cached for 1 hour."""
await asyncio.sleep(2) # Simulate slow query
return {"user_id": user_id, "name": f"User {user_id}"}
@cached_activity(policy=CachePolicy.INPUTS, ttl=timedelta(minutes=30))
@activity.defn(name="process_data")
async def process_data(data: dict) -> dict:
"""Data processing - cached for 30 minutes."""
await asyncio.sleep(1)
return {"processed": True, "user": data["name"]}
# 3. Define workflow
@workflow.defn
class UserWorkflow:
@workflow.run
async def run(self, user_id: int) -> dict:
# Both activities use caching automatically
user = await workflow.execute_activity(
fetch_user,
user_id,
start_to_close_timeout=timedelta(seconds=10)
)
result = await workflow.execute_activity(
process_data,
user,
start_to_close_timeout=timedelta(seconds=10)
)
return result
# 4. Run worker
async def run_worker():
setup_cache()
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="my-queue",
workflows=[UserWorkflow],
activities=[fetch_user, process_data]
)
await worker.run()
# 5. Execute workflow
async def execute_workflow():
client = await Client.connect("localhost:7233")
# First execution - cache miss (slow)
result1 = await client.execute_workflow(
UserWorkflow.run,
123,
id="workflow-1",
task_queue="my-queue"
)
# Second execution - cache hit (fast!)
result2 = await client.execute_workflow(
UserWorkflow.run,
123,
id="workflow-2",
task_queue="my-queue"
)
if __name__ == "__main__":
asyncio.run(run_worker())
```
## Configuration
### Redis Connection
```python
from temporal_activity_cache import RedisCacheBackend
# Basic connection
backend = RedisCacheBackend(
host="localhost",
port=6379,
db=0
)
# With authentication
backend = RedisCacheBackend(
host="redis.example.com",
port=6379,
password="secret",
db=0
)
# With custom connection pool
from redis.asyncio.connection import ConnectionPool
pool = ConnectionPool(
host="localhost",
port=6379,
max_connections=50,
decode_responses=False
)
backend = RedisCacheBackend(pool=pool)
```
## Requirements
- Python >= 3.10
- Temporal Python SDK >= 1.8.0
- Redis server
- redis[hiredis] >= 5.0.0
## Comparison: Event History vs Caching
| Feature | Event History | temporal-activity-cache |
|---------|--------------|------------------------|
| **Scope** | Per workflow execution | Cross-workflow, cross-worker |
| **Purpose** | Reliability & replay | Performance optimization |
| **Reuse** | Only within same workflow | Across different workflows |
| **Storage** | Temporal server | Redis (external) |
| **Automatic** | Yes (always on) | Opt-in per activity |
| **Expiration** | Workflow retention | Configurable TTL |
## Best Practices
### 1. Cache Read-Heavy Operations
✅ **Good candidates for caching:**
- Database queries
- External API calls
- File I/O operations
- Expensive computations
❌ **Don't cache:**
- Operations with side effects (emails, payments, etc.)
- Non-deterministic operations
- Operations that must always run
### 2. Set Appropriate TTLs
```python
# Short TTL for frequently changing data
@cached_activity(policy=CachePolicy.INPUTS, ttl=timedelta(minutes=5))
async def get_stock_price(symbol: str) -> float:
pass
# Long TTL for stable data
@cached_activity(policy=CachePolicy.INPUTS, ttl=timedelta(days=1))
async def get_user_profile(user_id: int) -> dict:
pass
# No expiration for immutable data
@cached_activity(policy=CachePolicy.TASK_SOURCE, ttl=None)
async def calculate_hash(data: str) -> str:
pass
```
### 3. Handle Cache Failures Gracefully
The library automatically falls back to executing activities if Redis is unavailable. Your workflows will continue to work without caching.
### 4. Monitor Cache Effectiveness
```python
import logging
# Enable debug logging to see cache hits/misses
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("temporal_activity_cache")
```
### 5. Use Appropriate Cache Policies
- **INPUTS**: For pure functions where output depends only on inputs
- **TASK_SOURCE**: When you want cache invalidation on code changes
- **NO_CACHE**: For operations that shouldn't be cached
## Limitations
- Only async activities are currently supported
- Activity results must be JSON serializable
- Cache invalidation is manual (no automatic invalidation on data changes)
## Testing
The library includes comprehensive tests using pytest, fakeredis, and Temporal's WorkflowEnvironment:
```bash
# Install dev dependencies
uv sync --extra dev
# Run all tests
pytest
# Run only unit tests (fast)
pytest -m unit
# Run with coverage
pytest --cov=src/temporal_activity_cache --cov-report=html
# Run integration tests
pytest -m integration
```
## Contributing
Contributions welcome! Please feel free to submit a Pull Request.
## License
MIT License - see LICENSE file for details
## Related Resources
- [Temporal Documentation](https://docs.temporal.io)
- [Temporal Python SDK](https://github.com/temporalio/sdk-python)
- [Prefect Caching Documentation](https://docs.prefect.io/concepts/tasks/#caching)
## Changelog
### 0.1.0 (2025-01-04)
- Initial release
- Redis cache backend
- Support for INPUTS and TASK_SOURCE cache policies
- Configurable TTL
- Async activity support
- Comprehensive test suite with pytest, fakeredis, and Temporal testing
- Complete example and documentation
Raw data
{
"_id": null,
"home_page": null,
"name": "temporal-activity-cache",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": null,
"keywords": "temporal, cache, redis, workflow, activity",
"author": null,
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/89/ab/3518abc9e3f67efbfebcee922ccf52a10684a6e1b6d8067a9bd7a7735078/temporal_activity_cache-0.1.0.tar.gz",
"platform": null,
"description": "# temporal-activity-cache\n\nPrefect-style activity caching for Temporal workflows using Redis.\n\n> **\u26a0\ufe0f Early Release Notice**\n> This is an early-stage release. While functional, the API may change and there may be bugs. Use with caution in production environments. This software is provided \"as is\" without warranty of any kind. See the LICENSE file for details.\n\n## Overview\n\n`temporal-activity-cache` brings Prefect-style caching to Temporal activities. It enables distributed caching across workers by storing activity results in Redis, allowing results to be reused across different workflow executions and worker instances.\n\n### Key Features\n\n- \ud83d\ude80 **Cross-workflow caching** - Reuse activity results across different workflow executions\n- \ud83d\udd04 **Distributed workers** - Cache shared via Redis across multiple worker instances\n- \u26a1 **Multiple cache policies** - Cache by inputs, source code, or disable caching\n- \u23f1\ufe0f **Configurable TTL** - Set expiration times for cached results\n- \ud83d\udee1\ufe0f **Graceful degradation** - Activities still work if Redis is unavailable\n- \ud83c\udfaf **Type-safe** - Full type hints and mypy support\n\n## Installation\n\n```bash\npip install temporal-activity-cache\n```\n\nOr with uv:\n\n```bash\nuv add temporal-activity-cache\n```\n\n## Quick Start\n\n### 1. Set up cache backend (once at startup)\n\n```python\nfrom temporal_activity_cache import set_cache_backend, RedisCacheBackend\n\n# Configure Redis backend\nbackend = RedisCacheBackend(host=\"localhost\", port=6379)\nset_cache_backend(backend)\n```\n\n### 2. Add caching to activities\n\n```python\nfrom datetime import timedelta\nfrom temporalio import activity\nfrom temporal_activity_cache import cached_activity, CachePolicy\n\n@cached_activity(policy=CachePolicy.INPUTS, ttl=timedelta(hours=1))\n@activity.defn(name=\"fetch_user\")\nasync def fetch_user(user_id: int) -> dict:\n \"\"\"This activity will be cached for 1 hour based on user_id.\"\"\"\n return await expensive_database_call(user_id)\n```\n\n### 3. Use in workflows (no changes needed!)\n\n```python\nfrom temporalio import workflow\nfrom datetime import timedelta\n\n@workflow.defn\nclass MyWorkflow:\n @workflow.run\n async def run(self, user_id: int) -> dict:\n # Activity results are automatically cached\n user = await workflow.execute_activity(\n fetch_user,\n user_id,\n start_to_close_timeout=timedelta(seconds=30)\n )\n return user\n```\n\n## How It Works\n\n### Traditional Temporal Event History\n\nTemporal's Event History provides replay capability **within a single workflow execution**, but doesn't cache across workflows:\n\n```python\n# Workflow execution 1\nresult1 = await client.execute_workflow(\n MyWorkflow.run,\n user_id=123,\n id=\"workflow-1\",\n task_queue=\"my-queue\"\n)\n# Activity executes \u2192 Result stored in workflow-1's Event History\n\n# Workflow execution 2 (different workflow!)\nresult2 = await client.execute_workflow(\n MyWorkflow.run,\n user_id=123, # \u2190 Same input!\n id=\"workflow-2\",\n task_queue=\"my-queue\"\n)\n# \u274c Activity executes AGAIN (separate Event History)\n```\n\n### With temporal-activity-cache\n\n```python\n# Workflow execution 1\nresult1 = await client.execute_workflow(\n MyWorkflow.run,\n user_id=123,\n id=\"workflow-1\",\n task_queue=\"my-queue\"\n)\n# Activity executes \u2192 Result cached in Redis\n\n# Workflow execution 2\nresult2 = await client.execute_workflow(\n MyWorkflow.run,\n user_id=123, # \u2190 Same input!\n id=\"workflow-2\",\n task_queue=\"my-queue\"\n)\n# \u2705 Cache HIT! Activity skipped, result from Redis\n```\n\n## Cache Policies\n\n### `CachePolicy.INPUTS` (Default)\n\nCache based on function inputs only:\n\n```python\n@cached_activity(policy=CachePolicy.INPUTS, ttl=timedelta(hours=1))\n@activity.defn\nasync def fetch_data(user_id: int) -> dict:\n return await db.query(user_id)\n\n# Same user_id = cache hit\nawait fetch_data(123) # Cache MISS - executes\nawait fetch_data(123) # Cache HIT - returns cached result\nawait fetch_data(456) # Cache MISS - different input\n```\n\n### `CachePolicy.TASK_SOURCE`\n\nCache based on function source code AND inputs:\n\n```python\n@cached_activity(policy=CachePolicy.TASK_SOURCE, ttl=timedelta(hours=1))\n@activity.defn\nasync def calculate(x: int) -> int:\n return x * 2\n\n# If you change the function code, cache is invalidated\n```\n\n### `CachePolicy.NO_CACHE`\n\nDisable caching entirely:\n\n```python\n@cached_activity(policy=CachePolicy.NO_CACHE)\n@activity.defn\nasync def send_email(to: str) -> None:\n # Always executes, never cached\n await email_service.send(to)\n```\n\n## Advanced Usage\n\n### Custom Cache Backend\n\n```python\nfrom temporal_activity_cache import CacheBackend\n\nclass MyCustomBackend(CacheBackend):\n async def get(self, key: str):\n # Your implementation\n pass\n\n async def set(self, key: str, value: Any, ttl: timedelta = None):\n # Your implementation\n pass\n\n # ... implement other methods\n\n# Use custom backend\nset_cache_backend(MyCustomBackend())\n```\n\n### Manual Cache Invalidation\n\n```python\nfrom temporal_activity_cache import invalidate_cache, CachePolicy\n\n# Invalidate specific cached result\nawait invalidate_cache(\n fetch_user,\n CachePolicy.INPUTS,\n user_id=123 # Same args used when caching\n)\n```\n\n### Per-Activity Backend\n\n```python\n# Use different cache backend for specific activity\nredis_backend = RedisCacheBackend(host=\"localhost\", port=6379)\n\n@cached_activity(\n policy=CachePolicy.INPUTS,\n ttl=timedelta(hours=1),\n cache_backend=redis_backend # Override global backend\n)\n@activity.defn\nasync def special_activity(data: str) -> str:\n return process(data)\n```\n\n## Complete Example\n\n```python\nimport asyncio\nfrom datetime import timedelta\nfrom temporalio import activity, workflow\nfrom temporalio.client import Client\nfrom temporalio.worker import Worker\n\nfrom temporal_activity_cache import (\n cached_activity,\n CachePolicy,\n set_cache_backend,\n RedisCacheBackend,\n)\n\n# 1. Configure cache backend\ndef setup_cache():\n backend = RedisCacheBackend(host=\"localhost\", port=6379)\n set_cache_backend(backend)\n\n# 2. Define cached activities\n@cached_activity(policy=CachePolicy.INPUTS, ttl=timedelta(hours=1))\n@activity.defn(name=\"fetch_user\")\nasync def fetch_user(user_id: int) -> dict:\n \"\"\"Expensive database call - cached for 1 hour.\"\"\"\n await asyncio.sleep(2) # Simulate slow query\n return {\"user_id\": user_id, \"name\": f\"User {user_id}\"}\n\n@cached_activity(policy=CachePolicy.INPUTS, ttl=timedelta(minutes=30))\n@activity.defn(name=\"process_data\")\nasync def process_data(data: dict) -> dict:\n \"\"\"Data processing - cached for 30 minutes.\"\"\"\n await asyncio.sleep(1)\n return {\"processed\": True, \"user\": data[\"name\"]}\n\n# 3. Define workflow\n@workflow.defn\nclass UserWorkflow:\n @workflow.run\n async def run(self, user_id: int) -> dict:\n # Both activities use caching automatically\n user = await workflow.execute_activity(\n fetch_user,\n user_id,\n start_to_close_timeout=timedelta(seconds=10)\n )\n\n result = await workflow.execute_activity(\n process_data,\n user,\n start_to_close_timeout=timedelta(seconds=10)\n )\n\n return result\n\n# 4. Run worker\nasync def run_worker():\n setup_cache()\n\n client = await Client.connect(\"localhost:7233\")\n worker = Worker(\n client,\n task_queue=\"my-queue\",\n workflows=[UserWorkflow],\n activities=[fetch_user, process_data]\n )\n await worker.run()\n\n# 5. Execute workflow\nasync def execute_workflow():\n client = await Client.connect(\"localhost:7233\")\n\n # First execution - cache miss (slow)\n result1 = await client.execute_workflow(\n UserWorkflow.run,\n 123,\n id=\"workflow-1\",\n task_queue=\"my-queue\"\n )\n\n # Second execution - cache hit (fast!)\n result2 = await client.execute_workflow(\n UserWorkflow.run,\n 123,\n id=\"workflow-2\",\n task_queue=\"my-queue\"\n )\n\nif __name__ == \"__main__\":\n asyncio.run(run_worker())\n```\n\n## Configuration\n\n### Redis Connection\n\n```python\nfrom temporal_activity_cache import RedisCacheBackend\n\n# Basic connection\nbackend = RedisCacheBackend(\n host=\"localhost\",\n port=6379,\n db=0\n)\n\n# With authentication\nbackend = RedisCacheBackend(\n host=\"redis.example.com\",\n port=6379,\n password=\"secret\",\n db=0\n)\n\n# With custom connection pool\nfrom redis.asyncio.connection import ConnectionPool\n\npool = ConnectionPool(\n host=\"localhost\",\n port=6379,\n max_connections=50,\n decode_responses=False\n)\n\nbackend = RedisCacheBackend(pool=pool)\n```\n\n## Requirements\n\n- Python >= 3.10\n- Temporal Python SDK >= 1.8.0\n- Redis server\n- redis[hiredis] >= 5.0.0\n\n## Comparison: Event History vs Caching\n\n| Feature | Event History | temporal-activity-cache |\n|---------|--------------|------------------------|\n| **Scope** | Per workflow execution | Cross-workflow, cross-worker |\n| **Purpose** | Reliability & replay | Performance optimization |\n| **Reuse** | Only within same workflow | Across different workflows |\n| **Storage** | Temporal server | Redis (external) |\n| **Automatic** | Yes (always on) | Opt-in per activity |\n| **Expiration** | Workflow retention | Configurable TTL |\n\n## Best Practices\n\n### 1. Cache Read-Heavy Operations\n\n\u2705 **Good candidates for caching:**\n- Database queries\n- External API calls\n- File I/O operations\n- Expensive computations\n\n\u274c **Don't cache:**\n- Operations with side effects (emails, payments, etc.)\n- Non-deterministic operations\n- Operations that must always run\n\n### 2. Set Appropriate TTLs\n\n```python\n# Short TTL for frequently changing data\n@cached_activity(policy=CachePolicy.INPUTS, ttl=timedelta(minutes=5))\nasync def get_stock_price(symbol: str) -> float:\n pass\n\n# Long TTL for stable data\n@cached_activity(policy=CachePolicy.INPUTS, ttl=timedelta(days=1))\nasync def get_user_profile(user_id: int) -> dict:\n pass\n\n# No expiration for immutable data\n@cached_activity(policy=CachePolicy.TASK_SOURCE, ttl=None)\nasync def calculate_hash(data: str) -> str:\n pass\n```\n\n### 3. Handle Cache Failures Gracefully\n\nThe library automatically falls back to executing activities if Redis is unavailable. Your workflows will continue to work without caching.\n\n### 4. Monitor Cache Effectiveness\n\n```python\nimport logging\n\n# Enable debug logging to see cache hits/misses\nlogging.basicConfig(level=logging.INFO)\nlogger = logging.getLogger(\"temporal_activity_cache\")\n```\n\n### 5. Use Appropriate Cache Policies\n\n- **INPUTS**: For pure functions where output depends only on inputs\n- **TASK_SOURCE**: When you want cache invalidation on code changes\n- **NO_CACHE**: For operations that shouldn't be cached\n\n## Limitations\n\n- Only async activities are currently supported\n- Activity results must be JSON serializable\n- Cache invalidation is manual (no automatic invalidation on data changes)\n\n## Testing\n\nThe library includes comprehensive tests using pytest, fakeredis, and Temporal's WorkflowEnvironment:\n\n```bash\n# Install dev dependencies\nuv sync --extra dev\n\n# Run all tests\npytest\n\n# Run only unit tests (fast)\npytest -m unit\n\n# Run with coverage\npytest --cov=src/temporal_activity_cache --cov-report=html\n\n# Run integration tests\npytest -m integration\n```\n\n## Contributing\n\nContributions welcome! Please feel free to submit a Pull Request.\n\n## License\n\nMIT License - see LICENSE file for details\n\n## Related Resources\n\n- [Temporal Documentation](https://docs.temporal.io)\n- [Temporal Python SDK](https://github.com/temporalio/sdk-python)\n- [Prefect Caching Documentation](https://docs.prefect.io/concepts/tasks/#caching)\n\n## Changelog\n\n### 0.1.0 (2025-01-04)\n\n- Initial release\n- Redis cache backend\n- Support for INPUTS and TASK_SOURCE cache policies\n- Configurable TTL\n- Async activity support\n- Comprehensive test suite with pytest, fakeredis, and Temporal testing\n- Complete example and documentation\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Prefect-style activity caching for Temporal workflows using Redis",
"version": "0.1.0",
"project_urls": {
"Documentation": "https://github.com/huscarldev/temporal-activity-cache#readme",
"Homepage": "https://github.com/huscarldev/temporal-activity-cache",
"Issues": "https://github.com/huscarldev/temporal-activity-cache/issues",
"Repository": "https://github.com/huscarldev/temporal-activity-cache"
},
"split_keywords": [
"temporal",
" cache",
" redis",
" workflow",
" activity"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "b48b4ca5cf982af5effdbd0baa73469d0da5350b62ebd8a3ba2f1d4abdc13f7c",
"md5": "4d9a377e11601f460ffb62c0b0b736f2",
"sha256": "2414ecd6fd9bba0f1c965c35872f86b4601136f64e357c33ba80cb2f6f042fd9"
},
"downloads": -1,
"filename": "temporal_activity_cache-0.1.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "4d9a377e11601f460ffb62c0b0b736f2",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10",
"size": 12710,
"upload_time": "2025-10-06T07:37:14",
"upload_time_iso_8601": "2025-10-06T07:37:14.418106Z",
"url": "https://files.pythonhosted.org/packages/b4/8b/4ca5cf982af5effdbd0baa73469d0da5350b62ebd8a3ba2f1d4abdc13f7c/temporal_activity_cache-0.1.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "89ab3518abc9e3f67efbfebcee922ccf52a10684a6e1b6d8067a9bd7a7735078",
"md5": "16f4a5088a277ffe5327e3ac74b44a47",
"sha256": "99266be5feff64ada85d194f0c4192a71c8670d1a2ff4be418a0080d6a623e01"
},
"downloads": -1,
"filename": "temporal_activity_cache-0.1.0.tar.gz",
"has_sig": false,
"md5_digest": "16f4a5088a277ffe5327e3ac74b44a47",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 9246,
"upload_time": "2025-10-06T07:37:15",
"upload_time_iso_8601": "2025-10-06T07:37:15.519899Z",
"url": "https://files.pythonhosted.org/packages/89/ab/3518abc9e3f67efbfebcee922ccf52a10684a6e1b6d8067a9bd7a7735078/temporal_activity_cache-0.1.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-06 07:37:15",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "huscarldev",
"github_project": "temporal-activity-cache#readme",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "temporal-activity-cache"
}