rq-chains


Namerq-chains JSON
Version 0.1.5 PyPI version JSON
download
home_pagehttps://github.com/xaled/rq-chains
SummaryRQ Chains is a Python library that extends RQ (Redis Queue) with a publisher-subscriber model for job chains. By Khalid Grandi (github.com/xaled).
upload_time2023-05-11 10:40:05
maintainer
docs_urlNone
authorKhalid Grandi
requires_python>=3
licenseMIT
keywords library rq redis pubsub
VCS
bugtrack_url
requirements rq redis
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # RQ Chains
RQ Chains is a tiny Python library that extends [RQ (Redis Queue)](https://python-rq.org/) with a publisher-subscriber model for job chains.
It allows jobs to publish results to channels and other jobs to subscribe to these channels to receive the results.
This enables the creation of chains of jobs where the output of one job is the input of the next.

## Installation
```bash
pip install rq-chains
```

## Quick Start
```python
# test_module.py
from rq import Queue
from redis import Redis
from rq_chains import publisher, subscriber

redis_conn = Redis(host='redis')
q = Queue(connection=redis_conn)


# Define a function that will publish results
@publisher(queue=q, channel_ids='add_result_channel')
def add(a, b):
    return a + b


# Define a function that will subscribe to the published results
@subscriber(queue=q, channel_ids='add_result_channel')
def square(n):
    return n * n


# main.py
from test_module import add
from rq_chains import walk_job_chain
from time import sleep

j = add.delay(2, 3)
sleep(1)
walk_job_chain(j)
# Recursively walks through a job chain and prints information about each job in the chain
# output:
# test_module.add(*(2, 3), **{}) = 5
#   test_module.square(*(5,), **{}) = 25 (channel_id='add_result_channel')

```

In this example, when you call `add.delay(2, 3)`, it will compute the result (5) and publish it to 'add_result_channel'.
The `square` function is a subscriber to 'add_result_channel', so it will automatically be called with the published result (5), and it will compute and return 25.

## Advanced Usage
RQ Chains offers extra features to tailor the behavior of publishers and subscribers.
For instance, you can specify custom conditions for when a publisher should share its result or when a subscriber should start.
You can also transform the publisher's result into a different set of arguments for the subscriber.
It's possible to set a limit on the maximum depth of job chains and introduce delays to the execution of subscriber jobs.
Moreover, you can publish results to a Redis PubSub channel or a Redis stream.

This documentation provides information about the 'publisher' and 'subscriber' decorators:

```
A decorator class for RQ jobs to implement the subscriber functionality in a publisher-subscriber model.

Args:
    queue (Union[Queue, str]): The RQ queue instance or queue name to enqueue the job.
    channel_ids (Optional[Union[str, Iterable[str]]], optional): An iterable of channel IDs to subscribe to
        or a single channel ID as a string. Defaults to None.
    result_mapper (Callable[[Any], Any], optional): A function that maps the publisher's result
        to the subscriber's arguments and keyword arguments. Defaults to a function returning the result
        as the first argument.
    subscribe_condition (Callable[[Any], Any], optional): A function that determines whether the subscriber
        should be executed based on the publisher's result. Defaults to always True.
    depth_limit (Optional[int], optional): The maximum depth of job chains that can be created
        by chained publishers and subscribers. Defaults to 1000.
    delay_timedelta (Optional[Union[int, float, timedelta]], optional): The time duration to delay the job
        execution, can be a number of seconds (int or float) or a timedelta object. Defaults to None.
    **job_kwargs: Additional keyword arguments to pass to the RQ job decorator.

Returns:
    A RQ job wrapped function that subscribes to the specified channels.
```


```
A decorator class for RQ jobs to implement the publisher functionality in a publisher-subscriber model.

Args:
    queue (Union[Queue, str]): The RQ queue instance or queue name to enqueue the job.
    channel_ids (Optional[Union[str, Iterable[str]]], optional): An iterable of channel IDs to publish to
        or a single channel ID as a string. Defaults to None.
    redis_pubsub_channels (Optional[Union[str, Iterable[str]]], optional): An iterable of Redis PubSub
        channel IDs to publish to or a single channel ID as a string. Defaults to None.
    redis_streams (Optional[Union[str, Iterable[str]]], optional): An iterable of Redis Stream names to publish
        to or a single stream name as a string. Defaults to None.
    publish_condition (Callable[[Any], Any], optional): A function that determines whether the publisher should
        publish its result. Defaults to a function returning the result as the first argument.
    **job_kwargs: Additional keyword arguments to pass to the RQ job decorator.

Returns:
    A RQ job wrapped function that publishes to the specified channels.
```

## License
RQ Chains is released under the [MIT License](/LICENSE).


            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/xaled/rq-chains",
    "name": "rq-chains",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3",
    "maintainer_email": "",
    "keywords": "library rq redis pubsub",
    "author": "Khalid Grandi",
    "author_email": "kh.grandi@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/52/f2/1a016e569083e863ce04d90fcb4503662dea1591c730b9089f9138144af6/rq-chains-0.1.5.tar.gz",
    "platform": null,
    "description": "# RQ Chains\nRQ Chains is a tiny Python library that extends [RQ (Redis Queue)](https://python-rq.org/) with a publisher-subscriber model for job chains.\nIt allows jobs to publish results to channels and other jobs to subscribe to these channels to receive the results.\nThis enables the creation of chains of jobs where the output of one job is the input of the next.\n\n## Installation\n```bash\npip install rq-chains\n```\n\n## Quick Start\n```python\n# test_module.py\nfrom rq import Queue\nfrom redis import Redis\nfrom rq_chains import publisher, subscriber\n\nredis_conn = Redis(host='redis')\nq = Queue(connection=redis_conn)\n\n\n# Define a function that will publish results\n@publisher(queue=q, channel_ids='add_result_channel')\ndef add(a, b):\n    return a + b\n\n\n# Define a function that will subscribe to the published results\n@subscriber(queue=q, channel_ids='add_result_channel')\ndef square(n):\n    return n * n\n\n\n# main.py\nfrom test_module import add\nfrom rq_chains import walk_job_chain\nfrom time import sleep\n\nj = add.delay(2, 3)\nsleep(1)\nwalk_job_chain(j)\n# Recursively walks through a job chain and prints information about each job in the chain\n# output:\n# test_module.add(*(2, 3), **{}) = 5\n#   test_module.square(*(5,), **{}) = 25 (channel_id='add_result_channel')\n\n```\n\nIn this example, when you call `add.delay(2, 3)`, it will compute the result (5) and publish it to 'add_result_channel'.\nThe `square` function is a subscriber to 'add_result_channel', so it will automatically be called with the published result (5), and it will compute and return 25.\n\n## Advanced Usage\nRQ Chains offers extra features to tailor the behavior of publishers and subscribers.\nFor instance, you can specify custom conditions for when a publisher should share its result or when a subscriber should start.\nYou can also transform the publisher's result into a different set of arguments for the subscriber.\nIt's possible to set a limit on the maximum depth of job chains and introduce delays to the execution of subscriber jobs.\nMoreover, you can publish results to a Redis PubSub channel or a Redis stream.\n\nThis documentation provides information about the 'publisher' and 'subscriber' decorators:\n\n```\nA decorator class for RQ jobs to implement the subscriber functionality in a publisher-subscriber model.\n\nArgs:\n    queue (Union[Queue, str]): The RQ queue instance or queue name to enqueue the job.\n    channel_ids (Optional[Union[str, Iterable[str]]], optional): An iterable of channel IDs to subscribe to\n        or a single channel ID as a string. Defaults to None.\n    result_mapper (Callable[[Any], Any], optional): A function that maps the publisher's result\n        to the subscriber's arguments and keyword arguments. Defaults to a function returning the result\n        as the first argument.\n    subscribe_condition (Callable[[Any], Any], optional): A function that determines whether the subscriber\n        should be executed based on the publisher's result. Defaults to always True.\n    depth_limit (Optional[int], optional): The maximum depth of job chains that can be created\n        by chained publishers and subscribers. Defaults to 1000.\n    delay_timedelta (Optional[Union[int, float, timedelta]], optional): The time duration to delay the job\n        execution, can be a number of seconds (int or float) or a timedelta object. Defaults to None.\n    **job_kwargs: Additional keyword arguments to pass to the RQ job decorator.\n\nReturns:\n    A RQ job wrapped function that subscribes to the specified channels.\n```\n\n\n```\nA decorator class for RQ jobs to implement the publisher functionality in a publisher-subscriber model.\n\nArgs:\n    queue (Union[Queue, str]): The RQ queue instance or queue name to enqueue the job.\n    channel_ids (Optional[Union[str, Iterable[str]]], optional): An iterable of channel IDs to publish to\n        or a single channel ID as a string. Defaults to None.\n    redis_pubsub_channels (Optional[Union[str, Iterable[str]]], optional): An iterable of Redis PubSub\n        channel IDs to publish to or a single channel ID as a string. Defaults to None.\n    redis_streams (Optional[Union[str, Iterable[str]]], optional): An iterable of Redis Stream names to publish\n        to or a single stream name as a string. Defaults to None.\n    publish_condition (Callable[[Any], Any], optional): A function that determines whether the publisher should\n        publish its result. Defaults to a function returning the result as the first argument.\n    **job_kwargs: Additional keyword arguments to pass to the RQ job decorator.\n\nReturns:\n    A RQ job wrapped function that publishes to the specified channels.\n```\n\n## License\nRQ Chains is released under the [MIT License](/LICENSE).\n\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "RQ Chains is a Python library that extends RQ (Redis Queue) with a publisher-subscriber model for job chains. By Khalid Grandi (github.com/xaled).",
    "version": "0.1.5",
    "project_urls": {
        "Homepage": "https://github.com/xaled/rq-chains"
    },
    "split_keywords": [
        "library",
        "rq",
        "redis",
        "pubsub"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "52f21a016e569083e863ce04d90fcb4503662dea1591c730b9089f9138144af6",
                "md5": "2094f25a813ca3913291e96aff5d23a4",
                "sha256": "3f85d37475c1554acd7227c3a2f93e9ba75afe0b177933c9b46d661816e46015"
            },
            "downloads": -1,
            "filename": "rq-chains-0.1.5.tar.gz",
            "has_sig": false,
            "md5_digest": "2094f25a813ca3913291e96aff5d23a4",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3",
            "size": 8125,
            "upload_time": "2023-05-11T10:40:05",
            "upload_time_iso_8601": "2023-05-11T10:40:05.292886Z",
            "url": "https://files.pythonhosted.org/packages/52/f2/1a016e569083e863ce04d90fcb4503662dea1591c730b9089f9138144af6/rq-chains-0.1.5.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-05-11 10:40:05",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "xaled",
    "github_project": "rq-chains",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "requirements": [
        {
            "name": "rq",
            "specs": [
                [
                    ">",
                    "1.10"
                ],
                [
                    "<",
                    "2"
                ]
            ]
        },
        {
            "name": "redis",
            "specs": [
                [
                    ">=",
                    "3.5.0"
                ]
            ]
        }
    ],
    "lcname": "rq-chains"
}
        
Elapsed time: 0.07190s