concurry


Nameconcurry JSON
Version 0.8.3 PyPI version JSON
download
home_pageNone
SummaryA unified, delightful Python concurrency library
upload_time2025-10-25 14:07:39
maintainerNone
docs_urlNone
authorNone
requires_python>=3.10
licenseNone
keywords async concurrency multiprocessing parallel ray threading
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Concurry

<p align="center">
  <img src="docs/concurry-landscape.png" alt="Concurry" width="800">
</p>

<p align="center">
  <a href="https://amazon-science.github.io/concurry/"><img src="https://img.shields.io/badge/docs-latest-blue.svg" alt="Documentation"></a>
  <a href="https://pypi.org/project/concurry/"><img src="https://img.shields.io/pypi/v/concurry.svg" alt="PyPI Version"></a>
  <a href="https://pypi.org/project/concurry/"><img src="https://img.shields.io/pypi/pyversions/concurry.svg" alt="Python Versions"></a>
  <a href="LICENSE"><img src="https://img.shields.io/badge/license-Apache%202.0-blue.svg" alt="License"></a>
  <a href="https://github.com/amazon-science/concurry/actions"><img src="https://img.shields.io/github/actions/workflow/status/amazon-science/concurry/tests.yml?branch=main" alt="Build Status"></a>
</p>

**A unified, delightful Python concurrency library** that makes parallel and distributed computing feel like writing sequential code. Built on the actor model, concurry provides workers, pools, rate limiting, retries, and seamless integration with Ray for distributed execution.

---

## πŸš€ Quick Example: 50x Speedup for Batch LLM Calls

Calling LLMs in a loop is painfully slow. With concurry, get 50x faster batch processing with just 3 lines of code change:

```python
from pydantic import BaseModel
from concurry import Worker
from tqdm import tqdm
import litellm

# Define your LLM worker
class LLM(Worker, BaseModel):
    temperature: float
    top_p: float
    model: str

    def call_llm(self, prompt: str) -> str:
        response = litellm.completion(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=self.temperature,
            top_p=self.top_p,
        )
        return response

# Load prompts (e.g., evaluating AI-generated responses for harmfulness)
prompts = [...] # 1000 prompts

# ❌ Sequential: ~775 seconds
llm = LLM(temperature=0.1, top_p=0.9, model="meta-llama/llama-3.1-8b-instruct")
responses = [llm.call_llm(prompt) for prompt in tqdm(prompts)]

# βœ… Concurrent with concurry: ~16 seconds (50x faster!)
llm = LLM.options(
    mode='thread',
    max_workers=100
).init(temperature=0.1, top_p=0.9, model="meta-llama/llama-3.1-8b-instruct")

futures = [llm.call_llm(prompt) for prompt in tqdm(prompts, desc="Submitting")]
responses = [f.result() for f in tqdm(futures, desc="Collecting results")]
```

**What changed?** Just added `.options(mode='thread', max_workers=100).init(...)` and called `.result()` on futures. That's it.

---

## Why Concurry?

Python's concurrency landscape is fragmented. Threading, multiprocessing, asyncio, and Ray all have different APIs, behaviors, and gotchas. **Concurry unifies them** with a consistent, elegant interface that works the same way everywhere.

### The Problem

```python
# Different APIs for different backends
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio
import ray

# Thread pool - one API
with ThreadPoolExecutor() as executor:
    future = executor.submit(task, arg)
    result = future.result()

# Process pool - same API, different behavior
with ProcessPoolExecutor() as executor:
    future = executor.submit(task, arg)
    result = future.result()

# Asyncio - completely different API
async def main():
    result = await asyncio.create_task(async_task(arg))

# Ray - yet another API
@ray.remote
def ray_task(arg):
    return result
future = ray_task.remote(arg)
result = ray.get(future)
```

### The Solution

```python
from concurry import Worker

class DataProcessor(Worker):
    def __init__(self, multiplier: int):
        self.multiplier = multiplier
    
    def process(self, value: int) -> int:
        return value * self.multiplier

# Same code, different backends - just change one parameter!
worker = DataProcessor.options(mode="thread").init(10)      # Thread
# worker = DataProcessor.options(mode="process").init(10)   # Process
# worker = DataProcessor.options(mode="asyncio").init(10)   # Asyncio
# worker = DataProcessor.options(mode="ray").init(10)       # Ray (distributed!)

result = worker.process(42).result()  # 420
worker.stop()
```

**One interface. Five execution modes. Zero headaches.**

---

## ✨ Key Features

### 🎭 Actor-Based Workers
Stateful workers that run across sync, thread, process, asyncio, and Ray backends with a unified API.

```python
class Counter(Worker):
    def __init__(self):
        self.count = 0
    
    def increment(self) -> int:
        self.count += 1
        return self.count

# State is isolated per worker
counter = Counter.options(mode="thread").init()
print(counter.increment().result())  # 1
print(counter.increment().result())  # 2
```

### πŸ”„ Worker Pools with Load Balancing
Distribute work across multiple workers with pluggable strategies (round-robin, least-active, random).

```python
# Pool of 10 workers with round-robin load balancing
pool = DataProcessor.options(
    mode="thread",
    max_workers=10,
    load_balancing="round_robin"
).init()

# Work automatically distributed across all workers
futures = [pool.process(i) for i in range(1000)]
results = [f.result() for f in futures]
```

### 🚦 Resource Limits & Rate Limiting
Token bucket, leaky bucket, sliding window algorithms. Enforce rate limits across workers with atomic multi-resource acquisition.

```python
from concurry import RateLimit, CallLimit

# Limit to 100 API calls and 10k tokens per minute
pool = APIWorker.options(
    mode="thread",
    max_workers=20,
    limits=[
        CallLimit(window_seconds=60, capacity=100),
        RateLimit(key="tokens", window_seconds=60, capacity=10_000)
    ]
).init()

# Limits automatically enforced across all 20 workers
```

### πŸ” Intelligent Retry Mechanisms
Exponential backoff, exception filtering, output validation, and automatic resource release between retries.

```python
# Retry on transient errors with exponential backoff
worker = APIWorker.options(
    mode="thread",
    num_retries=5,
    retry_algorithm="exponential",
    retry_on=[ConnectionError, TimeoutError],
    retry_until=lambda result: result.get("status") == "ok"
).init()

# Automatically retries up to 5 times on failure
```

### 🎯 Automatic Future Unwrapping
Pass futures between workers seamlessly. Concurry automatically unwraps them - even with zero-copy optimization for Ray.

```python
# Producer creates futures
producer = DataSource.options(mode="thread").init()
data_future = producer.get_data()

# Consumer automatically unwraps the future
consumer = DataProcessor.options(mode="process").init()
result = consumer.process(data_future).result()  # Auto-unwrapped!
```

### πŸ“Š Progress Tracking
Beautiful progress bars with state indicators, automatic style detection, and rich customization.

```python
from concurry.utils.progress import ProgressBar

for item in ProgressBar(items, desc="Processing"):
    process(item)
# Shows: Processing: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1000/1000 [00:05<00:00] βœ“ Complete
```

### βœ… Pydantic Integration
Full validation support with both model inheritance and decorators (Ray-compatible `@validate` decorator included).

```python
from morphic import validate

class ValidatedWorker(Worker):
    @validate
    def __init__(self, multiplier: int):
        self.multiplier = multiplier
    
    @validate
    def process(self, x: int) -> int:
        return x * self.multiplier

# Automatic type coercion and validation
worker = ValidatedWorker.options(mode="ray").init(multiplier="5")  # str→int
```

### ⚑ Async First-Class Support
AsyncIO workers route async methods to an event loop and sync methods to a dedicated thread for optimal performance (10-50x speedup for I/O).

```python
class AsyncAPIWorker(Worker):
    async def fetch(self, url: str) -> dict:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                return await resp.json()

worker = AsyncAPIWorker.options(mode="asyncio").init()
# 100 concurrent requests instead of sequential!
result = worker.fetch_many(urls).result()
```

---

## πŸš€ Installation

```bash
# Basic installation
pip install concurry

# With Ray support for distributed computing
pip install concurry[ray]

# Development installation with all extras
pip install concurry[all]
```

**Requirements:** Python 3.10+

---

## πŸ’‘ More Examples

### Worker Pool with Context Manager

```python
from concurry import Worker

class DataProcessor(Worker):
    def process(self, x: int) -> int:
        return x ** 2

# Context manager automatically cleans up all workers
with DataProcessor.options(mode="thread", max_workers=5).init() as pool:
    futures = [pool.process(i) for i in range(100)]
    results = [f.result() for f in futures]
# All workers automatically stopped here
```

### TaskWorker for Arbitrary Functions

```python
from concurry import TaskWorker

worker = TaskWorker.options(mode="process").init()

# Submit any function
future = worker.submit(lambda x: x ** 2, 42)
print(future.result())  # 1764

# Use map() for batch processing
results = list(worker.map(lambda x: x * 2, range(10)))
print(results)  # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

worker.stop()
```

### Distributed Computing with Ray

```python
import ray
from concurry import Worker

ray.init()

class DistributedProcessor(Worker):
    def __init__(self, model_name: str):
        self.model = load_large_model(model_name)
    
    def predict(self, data: list) -> list:
        return self.model.predict(data)

# 50 Ray actors across your cluster
pool = DistributedProcessor.options(
    mode="ray",
    max_workers=50,
    num_cpus=2,
    num_gpus=0.5
).init(model_name="bert-large")

# Distribute work across entire cluster
batches = [data[i:i+32] for i in range(0, len(data), 32)]
futures = [pool.predict(batch) for batch in batches]
results = [f.result() for f in futures]

pool.stop()
ray.shutdown()
```

### Production-Ready LLM with Rate Limits and Retries

```python
from concurry import Worker, RateLimit, CallLimit
from morphic import validate
import openai

class LLMWorker(Worker):
    @validate
    def __init__(self, model: str = "gpt-4", temperature: float = 0.7):
        self.model = model
        self.temperature = temperature
        self.client = openai.OpenAI()
    
    @validate
    def generate(self, prompt: str, max_tokens: int = 500) -> dict:
        # Rate limits automatically enforced
        with self.limits.acquire(requested={"tokens": max_tokens}) as acq:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=max_tokens,
                temperature=self.temperature
            )
            
            result = {
                "text": response.choices[0].message.content,
                "tokens": response.usage.total_tokens,
            }
            
            # Report actual usage for accurate rate limiting
            acq.update(usage={"tokens": result["tokens"]})
            return result

# Pool of 10 workers with shared rate limits and automatic retries
pool = LLMWorker.options(
    mode="thread",
    max_workers=10,
    
    # Shared rate limits across all workers
    limits=[
        RateLimit(key="tokens", window_seconds=60, capacity=10_000),
        CallLimit(window_seconds=60, capacity=100)
    ],
    
    # Automatic retry with exponential backoff
    num_retries=3,
    retry_algorithm="exponential",
    retry_on=[openai.RateLimitError, openai.APIConnectionError],
    retry_until=lambda r: len(r.get("text", "")) > 50
).init(model="gpt-4")

# Process 100 prompts with automatic rate limiting and retries
prompts = [f"Summarize topic {i}" for i in range(100)]
futures = [pool.generate(prompt, max_tokens=200) for prompt in prompts]
results = [f.result() for f in futures]

print(f"Processed {len(results)} prompts")
print(f"Total tokens: {sum(r['tokens'] for r in results)}")

pool.stop()
```

### Async I/O with 10-50x Speedup

```python
from concurry import Worker
import aiohttp
import asyncio

class AsyncAPIWorker(Worker):
    def __init__(self, base_url: str):
        self.base_url = base_url
    
    async def fetch(self, endpoint: str) -> dict:
        """Async method - runs in event loop."""
        async with aiohttp.ClientSession() as session:
            async with session.get(f"{self.base_url}/{endpoint}") as resp:
                return await resp.json()
    
    async def fetch_many(self, endpoints: list) -> list:
        """Fetch multiple URLs concurrently."""
        tasks = [self.fetch(ep) for ep in endpoints]
        return await asyncio.gather(*tasks)

worker = AsyncAPIWorker.options(mode="asyncio").init("https://api.example.com")

# All 100 requests execute concurrently (10-50x faster)!
result = worker.fetch_many([f"data/{i}" for i in range(100)]).result()

worker.stop()
```

---

## πŸ“š Documentation

- **[User Guide](https://amazon-science.github.io/concurry/user-guide/getting-started/)** - Comprehensive tutorials and examples
  - [Workers](https://amazon-science.github.io/concurry/user-guide/workers/) - Actor-based workers
  - [Worker Pools](https://amazon-science.github.io/concurry/user-guide/pools/) - Load balancing and pooling
  - [Limits](https://amazon-science.github.io/concurry/user-guide/limits/) - Rate limiting and resource management
  - [Retries](https://amazon-science.github.io/concurry/user-guide/retries/) - Retry mechanisms
  - [Futures](https://amazon-science.github.io/concurry/user-guide/futures/) - Unified future interface
  - [Progress](https://amazon-science.github.io/concurry/user-guide/progress/) - Progress tracking
- **[API Reference](https://amazon-science.github.io/concurry/api/)** - Detailed API documentation
- **[Examples](https://amazon-science.github.io/concurry/examples/)** - Real-world usage patterns
- **[Contributing](CONTRIBUTING.md)** - How to contribute

---

## πŸ—οΈ Design Principles

1. **Unified API**: One interface for all concurrency paradigms
2. **Actor Model**: Stateful workers with isolated state
3. **Production-Ready**: Rate limiting, retries, validation, monitoring
4. **Performance**: Zero-copy optimizations where possible
5. **Developer Experience**: Intuitive API, rich documentation, great error messages

---

## 🀝 Contributing

Contributions are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for guidelines.

---

## πŸ“„ License

This project is licensed under the Apache 2.0 License - see the [LICENSE](LICENSE) file for details.

---

## πŸ™ Acknowledgments

- Built on top of [morphic](https://github.com/adivekar/morphic) for validation
- Inspired by [Ray](https://ray.io/), [Pydantic](https://pydantic.dev/), and the actor model
- Progress bars powered by [tqdm](https://github.com/tqdm/tqdm)

---

<p align="center">
  <strong>Made with ❀️ by the <a href="https://github.com/amazon-science">Amazon Science</a> team</strong>
</p>

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "concurry",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": "async, concurrency, multiprocessing, parallel, ray, threading",
    "author": null,
    "author_email": "Abhishek Divekar <adivekar@utexas.edu>",
    "download_url": "https://files.pythonhosted.org/packages/42/5e/e21a348bb8919a46ebe07d2245fce764d67423f2d5a2770f172df1a48dc3/concurry-0.8.3.tar.gz",
    "platform": null,
    "description": "# Concurry\n\n<p align=\"center\">\n  <img src=\"docs/concurry-landscape.png\" alt=\"Concurry\" width=\"800\">\n</p>\n\n<p align=\"center\">\n  <a href=\"https://amazon-science.github.io/concurry/\"><img src=\"https://img.shields.io/badge/docs-latest-blue.svg\" alt=\"Documentation\"></a>\n  <a href=\"https://pypi.org/project/concurry/\"><img src=\"https://img.shields.io/pypi/v/concurry.svg\" alt=\"PyPI Version\"></a>\n  <a href=\"https://pypi.org/project/concurry/\"><img src=\"https://img.shields.io/pypi/pyversions/concurry.svg\" alt=\"Python Versions\"></a>\n  <a href=\"LICENSE\"><img src=\"https://img.shields.io/badge/license-Apache%202.0-blue.svg\" alt=\"License\"></a>\n  <a href=\"https://github.com/amazon-science/concurry/actions\"><img src=\"https://img.shields.io/github/actions/workflow/status/amazon-science/concurry/tests.yml?branch=main\" alt=\"Build Status\"></a>\n</p>\n\n**A unified, delightful Python concurrency library** that makes parallel and distributed computing feel like writing sequential code. Built on the actor model, concurry provides workers, pools, rate limiting, retries, and seamless integration with Ray for distributed execution.\n\n---\n\n## \ud83d\ude80 Quick Example: 50x Speedup for Batch LLM Calls\n\nCalling LLMs in a loop is painfully slow. With concurry, get 50x faster batch processing with just 3 lines of code change:\n\n```python\nfrom pydantic import BaseModel\nfrom concurry import Worker\nfrom tqdm import tqdm\nimport litellm\n\n# Define your LLM worker\nclass LLM(Worker, BaseModel):\n    temperature: float\n    top_p: float\n    model: str\n\n    def call_llm(self, prompt: str) -> str:\n        response = litellm.completion(\n            model=self.model,\n            messages=[{\"role\": \"user\", \"content\": prompt}],\n            temperature=self.temperature,\n            top_p=self.top_p,\n        )\n        return response\n\n# Load prompts (e.g., evaluating AI-generated responses for harmfulness)\nprompts = [...] # 1000 prompts\n\n# \u274c Sequential: ~775 seconds\nllm = LLM(temperature=0.1, top_p=0.9, model=\"meta-llama/llama-3.1-8b-instruct\")\nresponses = [llm.call_llm(prompt) for prompt in tqdm(prompts)]\n\n# \u2705 Concurrent with concurry: ~16 seconds (50x faster!)\nllm = LLM.options(\n    mode='thread',\n    max_workers=100\n).init(temperature=0.1, top_p=0.9, model=\"meta-llama/llama-3.1-8b-instruct\")\n\nfutures = [llm.call_llm(prompt) for prompt in tqdm(prompts, desc=\"Submitting\")]\nresponses = [f.result() for f in tqdm(futures, desc=\"Collecting results\")]\n```\n\n**What changed?** Just added `.options(mode='thread', max_workers=100).init(...)` and called `.result()` on futures. That's it.\n\n---\n\n## Why Concurry?\n\nPython's concurrency landscape is fragmented. Threading, multiprocessing, asyncio, and Ray all have different APIs, behaviors, and gotchas. **Concurry unifies them** with a consistent, elegant interface that works the same way everywhere.\n\n### The Problem\n\n```python\n# Different APIs for different backends\nfrom concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor\nimport asyncio\nimport ray\n\n# Thread pool - one API\nwith ThreadPoolExecutor() as executor:\n    future = executor.submit(task, arg)\n    result = future.result()\n\n# Process pool - same API, different behavior\nwith ProcessPoolExecutor() as executor:\n    future = executor.submit(task, arg)\n    result = future.result()\n\n# Asyncio - completely different API\nasync def main():\n    result = await asyncio.create_task(async_task(arg))\n\n# Ray - yet another API\n@ray.remote\ndef ray_task(arg):\n    return result\nfuture = ray_task.remote(arg)\nresult = ray.get(future)\n```\n\n### The Solution\n\n```python\nfrom concurry import Worker\n\nclass DataProcessor(Worker):\n    def __init__(self, multiplier: int):\n        self.multiplier = multiplier\n    \n    def process(self, value: int) -> int:\n        return value * self.multiplier\n\n# Same code, different backends - just change one parameter!\nworker = DataProcessor.options(mode=\"thread\").init(10)      # Thread\n# worker = DataProcessor.options(mode=\"process\").init(10)   # Process\n# worker = DataProcessor.options(mode=\"asyncio\").init(10)   # Asyncio\n# worker = DataProcessor.options(mode=\"ray\").init(10)       # Ray (distributed!)\n\nresult = worker.process(42).result()  # 420\nworker.stop()\n```\n\n**One interface. Five execution modes. Zero headaches.**\n\n---\n\n## \u2728 Key Features\n\n### \ud83c\udfad Actor-Based Workers\nStateful workers that run across sync, thread, process, asyncio, and Ray backends with a unified API.\n\n```python\nclass Counter(Worker):\n    def __init__(self):\n        self.count = 0\n    \n    def increment(self) -> int:\n        self.count += 1\n        return self.count\n\n# State is isolated per worker\ncounter = Counter.options(mode=\"thread\").init()\nprint(counter.increment().result())  # 1\nprint(counter.increment().result())  # 2\n```\n\n### \ud83d\udd04 Worker Pools with Load Balancing\nDistribute work across multiple workers with pluggable strategies (round-robin, least-active, random).\n\n```python\n# Pool of 10 workers with round-robin load balancing\npool = DataProcessor.options(\n    mode=\"thread\",\n    max_workers=10,\n    load_balancing=\"round_robin\"\n).init()\n\n# Work automatically distributed across all workers\nfutures = [pool.process(i) for i in range(1000)]\nresults = [f.result() for f in futures]\n```\n\n### \ud83d\udea6 Resource Limits & Rate Limiting\nToken bucket, leaky bucket, sliding window algorithms. Enforce rate limits across workers with atomic multi-resource acquisition.\n\n```python\nfrom concurry import RateLimit, CallLimit\n\n# Limit to 100 API calls and 10k tokens per minute\npool = APIWorker.options(\n    mode=\"thread\",\n    max_workers=20,\n    limits=[\n        CallLimit(window_seconds=60, capacity=100),\n        RateLimit(key=\"tokens\", window_seconds=60, capacity=10_000)\n    ]\n).init()\n\n# Limits automatically enforced across all 20 workers\n```\n\n### \ud83d\udd01 Intelligent Retry Mechanisms\nExponential backoff, exception filtering, output validation, and automatic resource release between retries.\n\n```python\n# Retry on transient errors with exponential backoff\nworker = APIWorker.options(\n    mode=\"thread\",\n    num_retries=5,\n    retry_algorithm=\"exponential\",\n    retry_on=[ConnectionError, TimeoutError],\n    retry_until=lambda result: result.get(\"status\") == \"ok\"\n).init()\n\n# Automatically retries up to 5 times on failure\n```\n\n### \ud83c\udfaf Automatic Future Unwrapping\nPass futures between workers seamlessly. Concurry automatically unwraps them - even with zero-copy optimization for Ray.\n\n```python\n# Producer creates futures\nproducer = DataSource.options(mode=\"thread\").init()\ndata_future = producer.get_data()\n\n# Consumer automatically unwraps the future\nconsumer = DataProcessor.options(mode=\"process\").init()\nresult = consumer.process(data_future).result()  # Auto-unwrapped!\n```\n\n### \ud83d\udcca Progress Tracking\nBeautiful progress bars with state indicators, automatic style detection, and rich customization.\n\n```python\nfrom concurry.utils.progress import ProgressBar\n\nfor item in ProgressBar(items, desc=\"Processing\"):\n    process(item)\n# Shows: Processing: 100%|\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588| 1000/1000 [00:05<00:00] \u2713 Complete\n```\n\n### \u2705 Pydantic Integration\nFull validation support with both model inheritance and decorators (Ray-compatible `@validate` decorator included).\n\n```python\nfrom morphic import validate\n\nclass ValidatedWorker(Worker):\n    @validate\n    def __init__(self, multiplier: int):\n        self.multiplier = multiplier\n    \n    @validate\n    def process(self, x: int) -> int:\n        return x * self.multiplier\n\n# Automatic type coercion and validation\nworker = ValidatedWorker.options(mode=\"ray\").init(multiplier=\"5\")  # str\u2192int\n```\n\n### \u26a1 Async First-Class Support\nAsyncIO workers route async methods to an event loop and sync methods to a dedicated thread for optimal performance (10-50x speedup for I/O).\n\n```python\nclass AsyncAPIWorker(Worker):\n    async def fetch(self, url: str) -> dict:\n        async with aiohttp.ClientSession() as session:\n            async with session.get(url) as resp:\n                return await resp.json()\n\nworker = AsyncAPIWorker.options(mode=\"asyncio\").init()\n# 100 concurrent requests instead of sequential!\nresult = worker.fetch_many(urls).result()\n```\n\n---\n\n## \ud83d\ude80 Installation\n\n```bash\n# Basic installation\npip install concurry\n\n# With Ray support for distributed computing\npip install concurry[ray]\n\n# Development installation with all extras\npip install concurry[all]\n```\n\n**Requirements:** Python 3.10+\n\n---\n\n## \ud83d\udca1 More Examples\n\n### Worker Pool with Context Manager\n\n```python\nfrom concurry import Worker\n\nclass DataProcessor(Worker):\n    def process(self, x: int) -> int:\n        return x ** 2\n\n# Context manager automatically cleans up all workers\nwith DataProcessor.options(mode=\"thread\", max_workers=5).init() as pool:\n    futures = [pool.process(i) for i in range(100)]\n    results = [f.result() for f in futures]\n# All workers automatically stopped here\n```\n\n### TaskWorker for Arbitrary Functions\n\n```python\nfrom concurry import TaskWorker\n\nworker = TaskWorker.options(mode=\"process\").init()\n\n# Submit any function\nfuture = worker.submit(lambda x: x ** 2, 42)\nprint(future.result())  # 1764\n\n# Use map() for batch processing\nresults = list(worker.map(lambda x: x * 2, range(10)))\nprint(results)  # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]\n\nworker.stop()\n```\n\n### Distributed Computing with Ray\n\n```python\nimport ray\nfrom concurry import Worker\n\nray.init()\n\nclass DistributedProcessor(Worker):\n    def __init__(self, model_name: str):\n        self.model = load_large_model(model_name)\n    \n    def predict(self, data: list) -> list:\n        return self.model.predict(data)\n\n# 50 Ray actors across your cluster\npool = DistributedProcessor.options(\n    mode=\"ray\",\n    max_workers=50,\n    num_cpus=2,\n    num_gpus=0.5\n).init(model_name=\"bert-large\")\n\n# Distribute work across entire cluster\nbatches = [data[i:i+32] for i in range(0, len(data), 32)]\nfutures = [pool.predict(batch) for batch in batches]\nresults = [f.result() for f in futures]\n\npool.stop()\nray.shutdown()\n```\n\n### Production-Ready LLM with Rate Limits and Retries\n\n```python\nfrom concurry import Worker, RateLimit, CallLimit\nfrom morphic import validate\nimport openai\n\nclass LLMWorker(Worker):\n    @validate\n    def __init__(self, model: str = \"gpt-4\", temperature: float = 0.7):\n        self.model = model\n        self.temperature = temperature\n        self.client = openai.OpenAI()\n    \n    @validate\n    def generate(self, prompt: str, max_tokens: int = 500) -> dict:\n        # Rate limits automatically enforced\n        with self.limits.acquire(requested={\"tokens\": max_tokens}) as acq:\n            response = self.client.chat.completions.create(\n                model=self.model,\n                messages=[{\"role\": \"user\", \"content\": prompt}],\n                max_tokens=max_tokens,\n                temperature=self.temperature\n            )\n            \n            result = {\n                \"text\": response.choices[0].message.content,\n                \"tokens\": response.usage.total_tokens,\n            }\n            \n            # Report actual usage for accurate rate limiting\n            acq.update(usage={\"tokens\": result[\"tokens\"]})\n            return result\n\n# Pool of 10 workers with shared rate limits and automatic retries\npool = LLMWorker.options(\n    mode=\"thread\",\n    max_workers=10,\n    \n    # Shared rate limits across all workers\n    limits=[\n        RateLimit(key=\"tokens\", window_seconds=60, capacity=10_000),\n        CallLimit(window_seconds=60, capacity=100)\n    ],\n    \n    # Automatic retry with exponential backoff\n    num_retries=3,\n    retry_algorithm=\"exponential\",\n    retry_on=[openai.RateLimitError, openai.APIConnectionError],\n    retry_until=lambda r: len(r.get(\"text\", \"\")) > 50\n).init(model=\"gpt-4\")\n\n# Process 100 prompts with automatic rate limiting and retries\nprompts = [f\"Summarize topic {i}\" for i in range(100)]\nfutures = [pool.generate(prompt, max_tokens=200) for prompt in prompts]\nresults = [f.result() for f in futures]\n\nprint(f\"Processed {len(results)} prompts\")\nprint(f\"Total tokens: {sum(r['tokens'] for r in results)}\")\n\npool.stop()\n```\n\n### Async I/O with 10-50x Speedup\n\n```python\nfrom concurry import Worker\nimport aiohttp\nimport asyncio\n\nclass AsyncAPIWorker(Worker):\n    def __init__(self, base_url: str):\n        self.base_url = base_url\n    \n    async def fetch(self, endpoint: str) -> dict:\n        \"\"\"Async method - runs in event loop.\"\"\"\n        async with aiohttp.ClientSession() as session:\n            async with session.get(f\"{self.base_url}/{endpoint}\") as resp:\n                return await resp.json()\n    \n    async def fetch_many(self, endpoints: list) -> list:\n        \"\"\"Fetch multiple URLs concurrently.\"\"\"\n        tasks = [self.fetch(ep) for ep in endpoints]\n        return await asyncio.gather(*tasks)\n\nworker = AsyncAPIWorker.options(mode=\"asyncio\").init(\"https://api.example.com\")\n\n# All 100 requests execute concurrently (10-50x faster)!\nresult = worker.fetch_many([f\"data/{i}\" for i in range(100)]).result()\n\nworker.stop()\n```\n\n---\n\n## \ud83d\udcda Documentation\n\n- **[User Guide](https://amazon-science.github.io/concurry/user-guide/getting-started/)** - Comprehensive tutorials and examples\n  - [Workers](https://amazon-science.github.io/concurry/user-guide/workers/) - Actor-based workers\n  - [Worker Pools](https://amazon-science.github.io/concurry/user-guide/pools/) - Load balancing and pooling\n  - [Limits](https://amazon-science.github.io/concurry/user-guide/limits/) - Rate limiting and resource management\n  - [Retries](https://amazon-science.github.io/concurry/user-guide/retries/) - Retry mechanisms\n  - [Futures](https://amazon-science.github.io/concurry/user-guide/futures/) - Unified future interface\n  - [Progress](https://amazon-science.github.io/concurry/user-guide/progress/) - Progress tracking\n- **[API Reference](https://amazon-science.github.io/concurry/api/)** - Detailed API documentation\n- **[Examples](https://amazon-science.github.io/concurry/examples/)** - Real-world usage patterns\n- **[Contributing](CONTRIBUTING.md)** - How to contribute\n\n---\n\n## \ud83c\udfd7\ufe0f Design Principles\n\n1. **Unified API**: One interface for all concurrency paradigms\n2. **Actor Model**: Stateful workers with isolated state\n3. **Production-Ready**: Rate limiting, retries, validation, monitoring\n4. **Performance**: Zero-copy optimizations where possible\n5. **Developer Experience**: Intuitive API, rich documentation, great error messages\n\n---\n\n## \ud83e\udd1d Contributing\n\nContributions are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for guidelines.\n\n---\n\n## \ud83d\udcc4 License\n\nThis project is licensed under the Apache 2.0 License - see the [LICENSE](LICENSE) file for details.\n\n---\n\n## \ud83d\ude4f Acknowledgments\n\n- Built on top of [morphic](https://github.com/adivekar/morphic) for validation\n- Inspired by [Ray](https://ray.io/), [Pydantic](https://pydantic.dev/), and the actor model\n- Progress bars powered by [tqdm](https://github.com/tqdm/tqdm)\n\n---\n\n<p align=\"center\">\n  <strong>Made with \u2764\ufe0f by the <a href=\"https://github.com/amazon-science\">Amazon Science</a> team</strong>\n</p>\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A unified, delightful Python concurrency library",
    "version": "0.8.3",
    "project_urls": null,
    "split_keywords": [
        "async",
        " concurrency",
        " multiprocessing",
        " parallel",
        " ray",
        " threading"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "04c9dbc6a0a7bfd7be47aed7e56dd56e450df37dc686211d4ffb772446e7dad2",
                "md5": "9ebc996dc1d7080ffdcacd48fe68e52c",
                "sha256": "74defaba0159d95cebefd1bf37b638759b148de51e29d667625efa5cc7e0aa38"
            },
            "downloads": -1,
            "filename": "concurry-0.8.3-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "9ebc996dc1d7080ffdcacd48fe68e52c",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 143757,
            "upload_time": "2025-10-25T14:07:37",
            "upload_time_iso_8601": "2025-10-25T14:07:37.995793Z",
            "url": "https://files.pythonhosted.org/packages/04/c9/dbc6a0a7bfd7be47aed7e56dd56e450df37dc686211d4ffb772446e7dad2/concurry-0.8.3-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "425ee21a348bb8919a46ebe07d2245fce764d67423f2d5a2770f172df1a48dc3",
                "md5": "4057c8e7a163663f5a8214e26a42431d",
                "sha256": "8d4dc21ed83a19a8e5805bc31d52cf51ac16048df225ad605515f91d914610ed"
            },
            "downloads": -1,
            "filename": "concurry-0.8.3.tar.gz",
            "has_sig": false,
            "md5_digest": "4057c8e7a163663f5a8214e26a42431d",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 2990855,
            "upload_time": "2025-10-25T14:07:39",
            "upload_time_iso_8601": "2025-10-25T14:07:39.517972Z",
            "url": "https://files.pythonhosted.org/packages/42/5e/e21a348bb8919a46ebe07d2245fce764d67423f2d5a2770f172df1a48dc3/concurry-0.8.3.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-25 14:07:39",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "concurry"
}
        
Elapsed time: 3.80549s