Name | redis-message-queue JSON |
Version |
0.8.0
JSON |
| download |
home_page | |
Summary | Python message queuing with Redis and message deduplication |
upload_time | 2024-01-19 19:46:50 |
maintainer | |
docs_url | None |
author | Elijas |
requires_python | >=3.8,<4.0 |
license | |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# redis-message-queue
Robust Python queuing with message deduplication.
# Features
* **Exactly-once delivery and publish guarantees:** Our system ensures that messages are both delivered and published no more than once. This is achieved through Redis' atomic transactions, message deduplication, and idempotent processing, which together prevent race conditions, duplicate processing, and multiple publications.
* **Message deduplication and idempotent processing:** By default, messages are deduplicated to prevent multiple sends of the same message. This ensures that each message is processed only once, maintaining idempotency even with producer retries.
* **Automatic message acknowledgement and resilient processing:** Messages are automatically acknowledged post-processing, with a robust mechanism in place to handle consumer crashes. Failed messages are moved to a dedicated log within Redis, preventing loss and allowing for recovery and reprocessing.
* **Efficient and visible message handling:** Success and failure logs provide insight into message processing outcomes. Additionally, Redis' blocking queue commands optimize resource usage by eliminating the need for constant polling, thus conserving CPU resources.
* **Graceful shutdown for idle consumers:** The system includes a mechanism to handle graceful shutdowns, allowing consumers to complete processing of the current message before shutting down. This is particularly useful for handling interrupt signals (e.g., Ctrl+C) without disrupting ongoing tasks.
* **Threadless heartbeats for idle consumers:** The system employs a heartbeat mechanism for consumers awaiting messages, which operates without additional threads or processes, ensuring minimal resource consumption and a simplified consumer architecture.
Please note that these features are optional and can be disabled as needed.
# Preparation
```bash
pip install redis-message-queue
```
You will also need a running Redis server. You can run one locally with Docker:
```bash
docker run -it --rm -p 6379:6379 redis
```
# Usage
Send messages to a queue:
```python
import time
from random import randint as random_number
from redis import Redis
from redis_message_queue import RedisMessageQueue
client = Redis.from_url("redis://localhost:6379/0")
queue = RedisMessageQueue(
name="my_message_queue",
client=client,
deduplication=True,
)
while True:
# Sending unique messages
queue.publish(f"Hello (id={random_number(0, 1_000_000)})")
time.sleep(1)
```
Receive messages from a queue:
```python
from redis import Redis
from redis_message_queue import RedisMessageQueue
client = Redis.from_url(
"redis://localhost:6379/0",
decode_responses=True,
)
queue = RedisMessageQueue("my_message_queue", client=client)
while True:
with queue.process_message() as message:
if message:
print(f"Received Message: {message}")
```
To see how the message queue operates, you can look at the examples in the [examples](https://github.com/Elijas/redis-message-queue/tree/main/examples) folder.
Run two publishers and three workers by using the commands below. Each command should be run in its own terminal window:
```bash
python -m examples.send_messages
python -m examples.send_messages
python -m examples.receive_messages
python -m examples.receive_messages
python -m examples.receive_messages
```
# Asyncio
To use asyncio, just replace
```from redis_message_queue import RedisMessageQueue```
with
```from redis_message_queue.asyncio import RedisMessageQueue```
All examples are the same for both versions, except that you'll need to manually close the connection as described in the [documentation](https://redis-py.readthedocs.io/en/stable/examples/asyncio_examples.html):
```python
import redis.asyncio as redis
client = redis.Redis()
# ...all of your other code
await client.aclose()
```
Raw data
{
"_id": null,
"home_page": "",
"name": "redis-message-queue",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.8,<4.0",
"maintainer_email": "",
"keywords": "",
"author": "Elijas",
"author_email": "4084885+Elijas@users.noreply.github.com",
"download_url": "https://files.pythonhosted.org/packages/64/b4/bd0a87a83b9d722fa8991f662e6c9eecb53ccd47e5fdb0975c07aeef372a/redis_message_queue-0.8.0.tar.gz",
"platform": null,
"description": "# redis-message-queue\n\nRobust Python queuing with message deduplication.\n\n# Features\n\n* **Exactly-once delivery and publish guarantees:** Our system ensures that messages are both delivered and published no more than once. This is achieved through Redis' atomic transactions, message deduplication, and idempotent processing, which together prevent race conditions, duplicate processing, and multiple publications.\n* **Message deduplication and idempotent processing:** By default, messages are deduplicated to prevent multiple sends of the same message. This ensures that each message is processed only once, maintaining idempotency even with producer retries.\n* **Automatic message acknowledgement and resilient processing:** Messages are automatically acknowledged post-processing, with a robust mechanism in place to handle consumer crashes. Failed messages are moved to a dedicated log within Redis, preventing loss and allowing for recovery and reprocessing.\n* **Efficient and visible message handling:** Success and failure logs provide insight into message processing outcomes. Additionally, Redis' blocking queue commands optimize resource usage by eliminating the need for constant polling, thus conserving CPU resources.\n* **Graceful shutdown for idle consumers:** The system includes a mechanism to handle graceful shutdowns, allowing consumers to complete processing of the current message before shutting down. This is particularly useful for handling interrupt signals (e.g., Ctrl+C) without disrupting ongoing tasks.\n* **Threadless heartbeats for idle consumers:** The system employs a heartbeat mechanism for consumers awaiting messages, which operates without additional threads or processes, ensuring minimal resource consumption and a simplified consumer architecture.\n\nPlease note that these features are optional and can be disabled as needed.\n\n# Preparation\n\n```bash\npip install redis-message-queue\n```\n\nYou will also need a running Redis server. You can run one locally with Docker:\n\n```bash\ndocker run -it --rm -p 6379:6379 redis\n```\n\n# Usage\n\n\nSend messages to a queue:\n\n```python\nimport time\nfrom random import randint as random_number\n\nfrom redis import Redis\n\nfrom redis_message_queue import RedisMessageQueue\n\nclient = Redis.from_url(\"redis://localhost:6379/0\")\nqueue = RedisMessageQueue(\n name=\"my_message_queue\",\n client=client,\n deduplication=True,\n)\n\nwhile True:\n # Sending unique messages\n queue.publish(f\"Hello (id={random_number(0, 1_000_000)})\")\n time.sleep(1)\n```\n\nReceive messages from a queue:\n\n```python\nfrom redis import Redis\n\nfrom redis_message_queue import RedisMessageQueue\n\nclient = Redis.from_url(\n \"redis://localhost:6379/0\",\n decode_responses=True,\n)\nqueue = RedisMessageQueue(\"my_message_queue\", client=client)\n\nwhile True:\n with queue.process_message() as message:\n if message:\n print(f\"Received Message: {message}\")\n```\n\nTo see how the message queue operates, you can look at the examples in the [examples](https://github.com/Elijas/redis-message-queue/tree/main/examples) folder. \n\nRun two publishers and three workers by using the commands below. Each command should be run in its own terminal window:\n\n```bash\npython -m examples.send_messages\npython -m examples.send_messages\npython -m examples.receive_messages\npython -m examples.receive_messages\npython -m examples.receive_messages\n```\n\n# Asyncio\n\nTo use asyncio, just replace\n\n```from redis_message_queue import RedisMessageQueue```\n\nwith\n\n```from redis_message_queue.asyncio import RedisMessageQueue```\n\nAll examples are the same for both versions, except that you'll need to manually close the connection as described in the [documentation](https://redis-py.readthedocs.io/en/stable/examples/asyncio_examples.html):\n```python\nimport redis.asyncio as redis\n\nclient = redis.Redis()\n# ...all of your other code\nawait client.aclose()\n```\n",
"bugtrack_url": null,
"license": "",
"summary": "Python message queuing with Redis and message deduplication",
"version": "0.8.0",
"project_urls": null,
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "6c3fb76d0e558a77a8c20ae9d7fb80fc05e7f240a5b2891f8b19a3f6aac45b53",
"md5": "133139a03b4526c959cbf04820767d5d",
"sha256": "1e3e1b9e37dfb1ce1cbb3bbfb87969813371e0b266843306b1c6a79126ad5188"
},
"downloads": -1,
"filename": "redis_message_queue-0.8.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "133139a03b4526c959cbf04820767d5d",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8,<4.0",
"size": 10109,
"upload_time": "2024-01-19T19:46:49",
"upload_time_iso_8601": "2024-01-19T19:46:49.267577Z",
"url": "https://files.pythonhosted.org/packages/6c/3f/b76d0e558a77a8c20ae9d7fb80fc05e7f240a5b2891f8b19a3f6aac45b53/redis_message_queue-0.8.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "64b4bd0a87a83b9d722fa8991f662e6c9eecb53ccd47e5fdb0975c07aeef372a",
"md5": "095c83d2440bd22b8d0be4a385445901",
"sha256": "bed2816256085d51c151afb615df9e8e6282edec7e325c1a0c68b28022ada490"
},
"downloads": -1,
"filename": "redis_message_queue-0.8.0.tar.gz",
"has_sig": false,
"md5_digest": "095c83d2440bd22b8d0be4a385445901",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8,<4.0",
"size": 7512,
"upload_time": "2024-01-19T19:46:50",
"upload_time_iso_8601": "2024-01-19T19:46:50.356003Z",
"url": "https://files.pythonhosted.org/packages/64/b4/bd0a87a83b9d722fa8991f662e6c9eecb53ccd47e5fdb0975c07aeef372a/redis_message_queue-0.8.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-01-19 19:46:50",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "redis-message-queue"
}