kombu-pyamqp-threadsafe


Namekombu-pyamqp-threadsafe JSON
Version 0.5.1 PyPI version JSON
download
home_pagehttps://github.com/spumer/kombu-pyamqp-threadsafe
SummaryThreadsafe implementation of pyamqp transport for kombu
upload_time2024-11-01 11:50:56
maintainerNone
docs_urlNone
authorspumer
requires_python<4.0,>=3.8
licenseNone
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            [![Open in Dev Containers](https://img.shields.io/static/v1?label=Dev%20Containers&message=Open&color=blue&logo=visualstudiocode)](https://vscode.dev/redirect?url=vscode://ms-vscode-remote.remote-containers/cloneInVolume?url=https://github.com/spumer/kombu-pyamqp-threadsafe)

# kombu-pyamqp-threadsafe

Threadsafe implementation of pyamqp transport for kombu

## TL;DR

kombu (pyamqp) designed as "1 thread = 1 connection", no connection sharing between threads

This package make possible design "1 thread = 1 channel", **allow** connection sharing between threads

```python
import kombu
import kombu_pyamqp_threadsafe

# Use drop-in replacement thread-safe kombu.Connection variant: 
connection = kombu_pyamqp_threadsafe.KombuConnection(...)

# or construct from kombu.Connection
kombu_connection = kombu.Connection(...)
kombu_pyamqp_threadsafe.KombuConnection.from_kombu_connection(kombu_connection)
```

## Motivation

The best practice to work with RabbitMQ is use 2 connections: 1 for consuming and 1 for producer.

https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html#connections-and-channels

But it's not possible with kombu (pyamqp) (https://github.com/celery/py-amqp/issues/420)

Without that we can't effectively consume many queues at same time

And when we publish messages in multithread application we need create connection for each publisher (producer) thread

## Other solutions

Connection pool.
- This is concept used by celery, now you do not create a lot of connections when publish messages but still need same connections count to consume

Use same channel to consume from all queues
- It's bad practice cause any exception in channel will close it and broke all consumers  

## Usage

**Q:** Ok, i install it, and now what?

**A:** You can use ConnectionHolder from this snippet: https://github.com/celery/py-amqp/issues/420#issuecomment-1858429922

**Q:** It's production ready? How you test it? 

**A:** Yes, it's production ready. We also make stress-test with 900 threads, when run 900 dramatiq actors which consume message and send new one to queue. Only 2 connections used.

**Q:** Dramatiq?

**A:** Yes, just use [dramatiq-kombu-broker](https://github.com/spumer/dramatiq-kombu-broker/)

### Rules

**Rule 1:** Do not share channel between threads
- Do not use `default_channel` from different threads,
 cause **responses is channel bound**, you can get error from other thread or produce error to different thread
- Use `default_channel_pool` and acquire channel for each thread

**Rule 2:** Channel bound to thread where his created.
- This required because dispatch frame can raise error which expected in caller thread (see **Rule 1**). E.g.: `queue_declare` method.
- If you declare queue in passive mode RabbitMQ will close channel
    and exception MAY raise in different thread when his drain events.
- To ensure all exceptions raised in expected context we bound channels to threads and dispatch received frames only in their own threads.


## Install

To add and install this package as a dependency of your project, run `poetry add kombu-pyamqp-threadsafe`.


<details>
<summary>Developing</summary>

- This project follows the [Conventional Commits](https://www.conventionalcommits.org/) standard to automate [Semantic Versioning](https://semver.org/) and [Keep A Changelog](https://keepachangelog.com/) with [Commitizen](https://github.com/commitizen-tools/commitizen).
- Run `poe` from within the development environment to print a list of [Poe the Poet](https://github.com/nat-n/poethepoet) tasks available to run on this project.
- Run `poetry add {package}` from within the development environment to install a run time dependency and add it to `pyproject.toml` and `poetry.lock`. Add `--group test` or `--group dev` to install a CI or development dependency, respectively.
- Run `poetry update` from within the development environment to upgrade all dependencies to the latest versions allowed by `pyproject.toml`.
- Run `cz bump` to bump the package's version, update the `CHANGELOG.md`, and create a git tag.

</details>

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/spumer/kombu-pyamqp-threadsafe",
    "name": "kombu-pyamqp-threadsafe",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.8",
    "maintainer_email": null,
    "keywords": null,
    "author": "spumer",
    "author_email": "john@example.com",
    "download_url": "https://files.pythonhosted.org/packages/3a/69/2b8d30181b5575a6d57af7b2b131182ce8cc179e192c7fef0d7d758d83dd/kombu_pyamqp_threadsafe-0.5.1.tar.gz",
    "platform": null,
    "description": "[![Open in Dev Containers](https://img.shields.io/static/v1?label=Dev%20Containers&message=Open&color=blue&logo=visualstudiocode)](https://vscode.dev/redirect?url=vscode://ms-vscode-remote.remote-containers/cloneInVolume?url=https://github.com/spumer/kombu-pyamqp-threadsafe)\n\n# kombu-pyamqp-threadsafe\n\nThreadsafe implementation of pyamqp transport for kombu\n\n## TL;DR\n\nkombu (pyamqp) designed as \"1 thread = 1 connection\", no connection sharing between threads\n\nThis package make possible design \"1 thread = 1 channel\", **allow** connection sharing between threads\n\n```python\nimport kombu\nimport kombu_pyamqp_threadsafe\n\n# Use drop-in replacement thread-safe kombu.Connection variant: \nconnection = kombu_pyamqp_threadsafe.KombuConnection(...)\n\n# or construct from kombu.Connection\nkombu_connection = kombu.Connection(...)\nkombu_pyamqp_threadsafe.KombuConnection.from_kombu_connection(kombu_connection)\n```\n\n## Motivation\n\nThe best practice to work with RabbitMQ is use 2 connections: 1 for consuming and 1 for producer.\n\nhttps://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html#connections-and-channels\n\nBut it's not possible with kombu (pyamqp) (https://github.com/celery/py-amqp/issues/420)\n\nWithout that we can't effectively consume many queues at same time\n\nAnd when we publish messages in multithread application we need create connection for each publisher (producer) thread\n\n## Other solutions\n\nConnection pool.\n- This is concept used by celery, now you do not create a lot of connections when publish messages but still need same connections count to consume\n\nUse same channel to consume from all queues\n- It's bad practice cause any exception in channel will close it and broke all consumers  \n\n## Usage\n\n**Q:** Ok, i install it, and now what?\n\n**A:** You can use ConnectionHolder from this snippet: https://github.com/celery/py-amqp/issues/420#issuecomment-1858429922\n\n**Q:** It's production ready? How you test it? \n\n**A:** Yes, it's production ready. We also make stress-test with 900 threads, when run 900 dramatiq actors which consume message and send new one to queue. Only 2 connections used.\n\n**Q:** Dramatiq?\n\n**A:** Yes, just use [dramatiq-kombu-broker](https://github.com/spumer/dramatiq-kombu-broker/)\n\n### Rules\n\n**Rule 1:** Do not share channel between threads\n- Do not use `default_channel` from different threads,\n cause **responses is channel bound**, you can get error from other thread or produce error to different thread\n- Use `default_channel_pool` and acquire channel for each thread\n\n**Rule 2:** Channel bound to thread where his created.\n- This required because dispatch frame can raise error which expected in caller thread (see **Rule 1**). E.g.: `queue_declare` method.\n- If you declare queue in passive mode RabbitMQ will close channel\n    and exception MAY raise in different thread when his drain events.\n- To ensure all exceptions raised in expected context we bound channels to threads and dispatch received frames only in their own threads.\n\n\n## Install\n\nTo add and install this package as a dependency of your project, run `poetry add kombu-pyamqp-threadsafe`.\n\n\n<details>\n<summary>Developing</summary>\n\n- This project follows the [Conventional Commits](https://www.conventionalcommits.org/) standard to automate [Semantic Versioning](https://semver.org/) and [Keep A Changelog](https://keepachangelog.com/) with [Commitizen](https://github.com/commitizen-tools/commitizen).\n- Run `poe` from within the development environment to print a list of [Poe the Poet](https://github.com/nat-n/poethepoet) tasks available to run on this project.\n- Run `poetry add {package}` from within the development environment to install a run time dependency and add it to `pyproject.toml` and `poetry.lock`. Add `--group test` or `--group dev` to install a CI or development dependency, respectively.\n- Run `poetry update` from within the development environment to upgrade all dependencies to the latest versions allowed by `pyproject.toml`.\n- Run `cz bump` to bump the package's version, update the `CHANGELOG.md`, and create a git tag.\n\n</details>\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Threadsafe implementation of pyamqp transport for kombu",
    "version": "0.5.1",
    "project_urls": {
        "Homepage": "https://github.com/spumer/kombu-pyamqp-threadsafe",
        "Repository": "https://github.com/spumer/kombu-pyamqp-threadsafe"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "aa17ba79f1aa2de9539409b37b5a10eb5052f95e2cce0ed8093ff449a0b59ad3",
                "md5": "8d11faf75159eacd2cf9b6d58cef6e42",
                "sha256": "c8510963bb59d6f8125ee78628889e8f757f97bd0c424fcf169a07c9d7ba75f0"
            },
            "downloads": -1,
            "filename": "kombu_pyamqp_threadsafe-0.5.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "8d11faf75159eacd2cf9b6d58cef6e42",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.8",
            "size": 9893,
            "upload_time": "2024-11-01T11:50:56",
            "upload_time_iso_8601": "2024-11-01T11:50:56.035315Z",
            "url": "https://files.pythonhosted.org/packages/aa/17/ba79f1aa2de9539409b37b5a10eb5052f95e2cce0ed8093ff449a0b59ad3/kombu_pyamqp_threadsafe-0.5.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "3a692b8d30181b5575a6d57af7b2b131182ce8cc179e192c7fef0d7d758d83dd",
                "md5": "063b1d17e3107a75bdd75263da2684ca",
                "sha256": "ed71ecf14fe19ae56ffd2f008275b32a0450cc7335448f2500490281ac01fbee"
            },
            "downloads": -1,
            "filename": "kombu_pyamqp_threadsafe-0.5.1.tar.gz",
            "has_sig": false,
            "md5_digest": "063b1d17e3107a75bdd75263da2684ca",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.8",
            "size": 11461,
            "upload_time": "2024-11-01T11:50:56",
            "upload_time_iso_8601": "2024-11-01T11:50:56.942154Z",
            "url": "https://files.pythonhosted.org/packages/3a/69/2b8d30181b5575a6d57af7b2b131182ce8cc179e192c7fef0d7d758d83dd/kombu_pyamqp_threadsafe-0.5.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-11-01 11:50:56",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "spumer",
    "github_project": "kombu-pyamqp-threadsafe",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "kombu-pyamqp-threadsafe"
}
        
Elapsed time: 0.50386s