[![Open in Dev Containers](https://img.shields.io/static/v1?label=Dev%20Containers&message=Open&color=blue&logo=visualstudiocode)](https://vscode.dev/redirect?url=vscode://ms-vscode-remote.remote-containers/cloneInVolume?url=https://github.com/spumer/kombu-pyamqp-threadsafe)
# kombu-pyamqp-threadsafe
Threadsafe implementation of pyamqp transport for kombu
## TL;DR
kombu (pyamqp) designed as "1 thread = 1 connection", no connection sharing between threads
This package make possible design "1 thread = 1 channel", **allow** connection sharing between threads
```python
import kombu
import kombu_pyamqp_threadsafe
# Use drop-in replacement thread-safe kombu.Connection variant:
connection = kombu_pyamqp_threadsafe.KombuConnection(...)
# or construct from kombu.Connection
kombu_connection = kombu.Connection(...)
kombu_pyamqp_threadsafe.KombuConnection.from_kombu_connection(kombu_connection)
```
## Motivation
The best practice to work with RabbitMQ is use 2 connections: 1 for consuming and 1 for producer.
https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html#connections-and-channels
But it's not possible with kombu (pyamqp) (https://github.com/celery/py-amqp/issues/420)
Without that we can't effectively consume many queues at same time
And when we publish messages in multithread application we need create connection for each publisher (producer) thread
## Other solutions
Connection pool.
- This is concept used by celery, now you do not create a lot of connections when publish messages but still need same connections count to consume
Use same channel to consume from all queues
- It's bad practice cause any exception in channel will close it and broke all consumers
## Usage
**Q:** Ok, i install it, and now what?
**A:** You can use ConnectionHolder from this snippet: https://github.com/celery/py-amqp/issues/420#issuecomment-1858429922
**Q:** It's production ready? How you test it?
**A:** Yes, it's production ready. We also make stress-test with 900 threads, when run 900 dramatiq actors which consume message and send new one to queue. Only 2 connections used.
**Q:** Dramatiq?
**A:** Yes, just use [dramatiq-kombu-broker](https://github.com/spumer/dramatiq-kombu-broker/)
### Rules
**Rule 1:** Do not share channel between threads
- Do not use `default_channel` from different threads,
cause **responses is channel bound**, you can get error from other thread or produce error to different thread
- Use `default_channel_pool` and acquire channel for each thread
**Rule 2:** Channel bound to thread where his created.
- This required because dispatch frame can raise error which expected in caller thread (see **Rule 1**). E.g.: `queue_declare` method.
- If you declare queue in passive mode RabbitMQ will close channel
and exception MAY raise in different thread when his drain events.
- To ensure all exceptions raised in expected context we bound channels to threads and dispatch received frames only in their own threads.
## Install
To add and install this package as a dependency of your project, run `poetry add kombu-pyamqp-threadsafe`.
<details>
<summary>Developing</summary>
- This project follows the [Conventional Commits](https://www.conventionalcommits.org/) standard to automate [Semantic Versioning](https://semver.org/) and [Keep A Changelog](https://keepachangelog.com/) with [Commitizen](https://github.com/commitizen-tools/commitizen).
- Run `poe` from within the development environment to print a list of [Poe the Poet](https://github.com/nat-n/poethepoet) tasks available to run on this project.
- Run `poetry add {package}` from within the development environment to install a run time dependency and add it to `pyproject.toml` and `poetry.lock`. Add `--group test` or `--group dev` to install a CI or development dependency, respectively.
- Run `poetry update` from within the development environment to upgrade all dependencies to the latest versions allowed by `pyproject.toml`.
- Run `cz bump` to bump the package's version, update the `CHANGELOG.md`, and create a git tag.
</details>
Raw data
{
"_id": null,
"home_page": "https://github.com/spumer/kombu-pyamqp-threadsafe",
"name": "kombu-pyamqp-threadsafe",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.8",
"maintainer_email": null,
"keywords": null,
"author": "spumer",
"author_email": "john@example.com",
"download_url": "https://files.pythonhosted.org/packages/3a/69/2b8d30181b5575a6d57af7b2b131182ce8cc179e192c7fef0d7d758d83dd/kombu_pyamqp_threadsafe-0.5.1.tar.gz",
"platform": null,
"description": "[![Open in Dev Containers](https://img.shields.io/static/v1?label=Dev%20Containers&message=Open&color=blue&logo=visualstudiocode)](https://vscode.dev/redirect?url=vscode://ms-vscode-remote.remote-containers/cloneInVolume?url=https://github.com/spumer/kombu-pyamqp-threadsafe)\n\n# kombu-pyamqp-threadsafe\n\nThreadsafe implementation of pyamqp transport for kombu\n\n## TL;DR\n\nkombu (pyamqp) designed as \"1 thread = 1 connection\", no connection sharing between threads\n\nThis package make possible design \"1 thread = 1 channel\", **allow** connection sharing between threads\n\n```python\nimport kombu\nimport kombu_pyamqp_threadsafe\n\n# Use drop-in replacement thread-safe kombu.Connection variant: \nconnection = kombu_pyamqp_threadsafe.KombuConnection(...)\n\n# or construct from kombu.Connection\nkombu_connection = kombu.Connection(...)\nkombu_pyamqp_threadsafe.KombuConnection.from_kombu_connection(kombu_connection)\n```\n\n## Motivation\n\nThe best practice to work with RabbitMQ is use 2 connections: 1 for consuming and 1 for producer.\n\nhttps://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html#connections-and-channels\n\nBut it's not possible with kombu (pyamqp) (https://github.com/celery/py-amqp/issues/420)\n\nWithout that we can't effectively consume many queues at same time\n\nAnd when we publish messages in multithread application we need create connection for each publisher (producer) thread\n\n## Other solutions\n\nConnection pool.\n- This is concept used by celery, now you do not create a lot of connections when publish messages but still need same connections count to consume\n\nUse same channel to consume from all queues\n- It's bad practice cause any exception in channel will close it and broke all consumers \n\n## Usage\n\n**Q:** Ok, i install it, and now what?\n\n**A:** You can use ConnectionHolder from this snippet: https://github.com/celery/py-amqp/issues/420#issuecomment-1858429922\n\n**Q:** It's production ready? How you test it? \n\n**A:** Yes, it's production ready. We also make stress-test with 900 threads, when run 900 dramatiq actors which consume message and send new one to queue. Only 2 connections used.\n\n**Q:** Dramatiq?\n\n**A:** Yes, just use [dramatiq-kombu-broker](https://github.com/spumer/dramatiq-kombu-broker/)\n\n### Rules\n\n**Rule 1:** Do not share channel between threads\n- Do not use `default_channel` from different threads,\n cause **responses is channel bound**, you can get error from other thread or produce error to different thread\n- Use `default_channel_pool` and acquire channel for each thread\n\n**Rule 2:** Channel bound to thread where his created.\n- This required because dispatch frame can raise error which expected in caller thread (see **Rule 1**). E.g.: `queue_declare` method.\n- If you declare queue in passive mode RabbitMQ will close channel\n and exception MAY raise in different thread when his drain events.\n- To ensure all exceptions raised in expected context we bound channels to threads and dispatch received frames only in their own threads.\n\n\n## Install\n\nTo add and install this package as a dependency of your project, run `poetry add kombu-pyamqp-threadsafe`.\n\n\n<details>\n<summary>Developing</summary>\n\n- This project follows the [Conventional Commits](https://www.conventionalcommits.org/) standard to automate [Semantic Versioning](https://semver.org/) and [Keep A Changelog](https://keepachangelog.com/) with [Commitizen](https://github.com/commitizen-tools/commitizen).\n- Run `poe` from within the development environment to print a list of [Poe the Poet](https://github.com/nat-n/poethepoet) tasks available to run on this project.\n- Run `poetry add {package}` from within the development environment to install a run time dependency and add it to `pyproject.toml` and `poetry.lock`. Add `--group test` or `--group dev` to install a CI or development dependency, respectively.\n- Run `poetry update` from within the development environment to upgrade all dependencies to the latest versions allowed by `pyproject.toml`.\n- Run `cz bump` to bump the package's version, update the `CHANGELOG.md`, and create a git tag.\n\n</details>\n",
"bugtrack_url": null,
"license": null,
"summary": "Threadsafe implementation of pyamqp transport for kombu",
"version": "0.5.1",
"project_urls": {
"Homepage": "https://github.com/spumer/kombu-pyamqp-threadsafe",
"Repository": "https://github.com/spumer/kombu-pyamqp-threadsafe"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "aa17ba79f1aa2de9539409b37b5a10eb5052f95e2cce0ed8093ff449a0b59ad3",
"md5": "8d11faf75159eacd2cf9b6d58cef6e42",
"sha256": "c8510963bb59d6f8125ee78628889e8f757f97bd0c424fcf169a07c9d7ba75f0"
},
"downloads": -1,
"filename": "kombu_pyamqp_threadsafe-0.5.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "8d11faf75159eacd2cf9b6d58cef6e42",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.8",
"size": 9893,
"upload_time": "2024-11-01T11:50:56",
"upload_time_iso_8601": "2024-11-01T11:50:56.035315Z",
"url": "https://files.pythonhosted.org/packages/aa/17/ba79f1aa2de9539409b37b5a10eb5052f95e2cce0ed8093ff449a0b59ad3/kombu_pyamqp_threadsafe-0.5.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "3a692b8d30181b5575a6d57af7b2b131182ce8cc179e192c7fef0d7d758d83dd",
"md5": "063b1d17e3107a75bdd75263da2684ca",
"sha256": "ed71ecf14fe19ae56ffd2f008275b32a0450cc7335448f2500490281ac01fbee"
},
"downloads": -1,
"filename": "kombu_pyamqp_threadsafe-0.5.1.tar.gz",
"has_sig": false,
"md5_digest": "063b1d17e3107a75bdd75263da2684ca",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.8",
"size": 11461,
"upload_time": "2024-11-01T11:50:56",
"upload_time_iso_8601": "2024-11-01T11:50:56.942154Z",
"url": "https://files.pythonhosted.org/packages/3a/69/2b8d30181b5575a6d57af7b2b131182ce8cc179e192c7fef0d7d758d83dd/kombu_pyamqp_threadsafe-0.5.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-11-01 11:50:56",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "spumer",
"github_project": "kombu-pyamqp-threadsafe",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "kombu-pyamqp-threadsafe"
}