grugstream


Namegrugstream JSON
Version 0.0.16 PyPI version JSON
download
home_pagehttps://github.com/thejaminator/grugstream
SummaryThe grugstream library provides an easy way to work with asynchronous/ reactive / streaming programming.
upload_time2023-12-04 00:16:29
maintainer
docs_urlNone
authorJames Chua
requires_python>=3.9,<4.0
licenseMIT
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Grugstream

[![Build Status](https://github.com/thejaminator/grugstream/actions/workflows/main.yml/badge.svg)](https://github.com/thejaminator/grugstream/actions/workflows/main.yml)
[![python](https://img.shields.io/pypi/pyversions/grugstream.svg)](https://pypi.org/project/grugstream)

```
pip install grugstream
```

See the [documentation](https://thejaminator.github.io/grugstream/)
## Introduction

The grugstream library provides an easy way to work with asynchronous/ reactive / streaming programming in Python.

Set up data processing pipelines that are faster, use less memory and are easy to understand. 

Works with anyio - which means it works with asyncio and trio.

This library is inspired by
- Rxpy, aioreactive
- monix

Philosophy - when you hit dot on your keyboard - you should have everything you need.

Also - everything is statically typed!

## Getting Started

### Basic Example
What we always do is
1. Create an observable (A stream that is not running yet)
2. Transform it things like `map` or `filter`
3. Run it. 
   - For example, `to_list` will run the observable and collect the results into a list.
   - `run_to_completion` will run the observable until it completes
   - `to_file_appending` will run the observable and write the results to a file
```python
import anyio
from grugstream import Observable


# Mock async function simulating an HTTP call to Google
async def mock_http_call_to_google(item: str) -> str:
    # Simulate the asynchronous delay of an HTTP request
    await anyio.sleep(1)
    return f"Response from Google {item}"


async def main():
    # Create an observable, and call google for each item
    observable = (
        Observable.from_iterable(["one", "two", "three"])
        # this is the same as map, but it's for async functions
        .map_async(lambda item: mock_http_call_to_google(item))
    )

    # Actually start the stream and collect the results into a list
    results = await observable.to_list()

    for response in results:
        print(response)


anyio.run(main)
```

### Map operators - Parallel Example

Running things in parallel is as simple as calling `map_async_par` instead of `map_async`:

```python
import anyio
from grugstream import Observable


# Mock async function simulating an HTTP call to Google
async def mock_http_call_to_google(item: str) -> str:
    # Simulate the asynchronous delay of an HTTP request
    await anyio.sleep(1)
    return f"Response from Google {item}"


async def main():
    # Create an observable, and call google for each item
    observable = (
         # repeat every 0.1 seconds
        Observable.from_repeat("one", 0.1)
        # at any given time, there will be at most 50 concurrent calls to google
        .map_async_par(lambda item: mock_http_call_to_google(item), max_par=50)
    )

    # Actually start the stream - results into a list
    # Let's take only 20 results
    results = await observable.take(20).to_list()

    for response in results:
        print(response)


anyio.run(main)
```

## Chaining other api calls
Suppose you have multiple api calls to run.  
You  want all the api calls to be run in parallel with each other - 
the items doing the 2nd api call don't need to wait for all the items for the 1st api call to complete.
And maybe you want to stream to a file while it completes.
Thats when streaming really shines.
```python
import random
from pathlib import Path
from typing import List, Optional

import anyio

from grugstream import Observable


# Mock async function simulating an HTTP call to Google
async def mock_http_call_to_google(item: str) -> str:
    await anyio.sleep(1)
    return f"Google Response for {item}"


# Mock async function simulating an API call that returns a list of items
async def mock_api_call_that_returns_list(item: str) -> List[str]:
    await anyio.sleep(0.5)
    return [f"Item {i} from {item}" for i in range(3)]


# Mock async function simulating an API call that returns an Optional value
async def mock_api_call_that_returns_optional(item: str) -> Optional[str]:
    await anyio.sleep(0.2)
    maybe_yes = random.choice([True, False])
    return item if maybe_yes else None


async def main():
    observable = (
        Observable.from_repeat("query", 0.1)
        .map_async_par(lambda item: mock_http_call_to_google(item))
        .map_async_par(lambda item: mock_api_call_that_returns_list(item))
        .flatten_iterable()  # Flatten the list into individual items
        .map_async_par(lambda item: mock_api_call_that_returns_optional(item))
        .print()
        .flatten_optional()  # Remove None values
    )

    # Write the results to a file
    await observable.take(100).to_file(Path("results.txt"))


anyio.run(main)
```

## Logging and debugging, print and tqdm
We provide a `print` and `tqdm` operator to help you debug your streams.

```python
import anyio
from tqdm import tqdm

from grugstream import Observable


# Mock async function simulating an HTTP call to Google
async def mock_http_call_to_google(item: str) -> str:
    await anyio.sleep(0.1)
    return f"Google Response for {item}"


async def main():
    observable = (
        Observable.from_repeat("query", 0.1)
        .throttle(1)  # don't spam google too much!
        .map_async_par(lambda item: mock_http_call_to_google(item))
        # Show a progress bar that should show ~1 it/s
        .tqdm(tqdm_bar=tqdm(desc="Google observable"))
        # Print the elements
        .print()
    )

    await observable.take(1000).run_to_completion()


anyio.run(main)
```


## for_each operator - side effects
Sometimes you want to do something with the elements of the stream, but you don't want to change the stream itself.
For example, you might want to write some intermediate items to a file.

```python
import anyio
from pathlib import Path
from grugstream import Observable


# Mock async function simulating an HTTP call to Google
async def mock_http_call_to_google(item: str) -> str:
    await anyio.sleep(0.1)
    return f"Google Response for {item}"


async def main():
    my_list = []
    observable = (
        Observable.from_repeat("query", 0.1)
        .map_async_par(lambda item: mock_http_call_to_google(item))
        # What's google's response? Let's write it to a file
        .for_each_to_file(
            file_path=Path("results.txt"),
        )
        # Let's also append it to a list to print
        .for_each(lambda item: my_list.append(item))
        .map(lambda item: item.upper())
        .print()
    )

    await observable.take(1000).run_to_completion()
    print(my_list)


anyio.run(main)

```




## Building an Observable

This library provides several utility methods for creating observables:

### From Existing Data

- `from_iterable(iterable)`: Create an observable from a Python iterable like a list or a tuple.
- `from_async_iterable(iterable)`: Create an observable from an asynchronous iterable.
- `from_one(value)`: Create an observable that emits a single value.
- `from_one_option(value)`: Create an observable that emits a single value or nothing if the value is `None`.

Example:

```python
from grugstream import Observable
observable = Observable.from_iterable([1, 2, 3])
```


## Transforming Observables

### `map`

Applies a function to all elements in the source observable.

```python
observable = Observable.from_iterable([1, 2, 3])
new_observable = observable.map(lambda x: x * 2)
```

### `filter`

Filters out elements that do not match a given predicate.

```python
observable = Observable.from_iterable([1, 2, 3])
filtered_observable = observable.filter(lambda x: x > 1)
```

### `flatten_iterable`

Transforms an observable of iterables into an observable of the individual items.

```python
observable = Observable.from_iterable([[1, 2], [3, 4]])
flattened_observable = observable.flatten_iterable()
```

## Back-pressure, Buffearing, Throttling

The library supports back-pressure to ensure that the producer and consumer are in sync. There are also methods like `throttle(seconds)` to control the rate of emissions.

```python
throttled_observable = observable.throttle(1.0)  # Emits at most one item per second
```

## Subscription and Error Handling

You can use the `Subscriber` class to define custom subscribers. It has three methods:

- `on_next(value)`: Called when a new value is emitted.
- `on_error(error)`: Called when an error occurs.
- `on_completed()`: Called when the observable completes.

In general, I hope that you wouldn't have to implement your own subscriber.
Most things you want to do can be done by chaining operators such as `map` and `filter`.
            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/thejaminator/grugstream",
    "name": "grugstream",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.9,<4.0",
    "maintainer_email": "",
    "keywords": "",
    "author": "James Chua",
    "author_email": "chuajamessh@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/38/e6/c9ae356490d9914aea9dcd4879fcc0ea920134b8cc104b8cc1421cb30570/grugstream-0.0.16.tar.gz",
    "platform": null,
    "description": "# Grugstream\n\n[![Build Status](https://github.com/thejaminator/grugstream/actions/workflows/main.yml/badge.svg)](https://github.com/thejaminator/grugstream/actions/workflows/main.yml)\n[![python](https://img.shields.io/pypi/pyversions/grugstream.svg)](https://pypi.org/project/grugstream)\n\n```\npip install grugstream\n```\n\nSee the [documentation](https://thejaminator.github.io/grugstream/)\n## Introduction\n\nThe grugstream library provides an easy way to work with asynchronous/ reactive / streaming programming in Python.\n\nSet up data processing pipelines that are faster, use less memory and are easy to understand. \n\nWorks with anyio - which means it works with asyncio and trio.\n\nThis library is inspired by\n- Rxpy, aioreactive\n- monix\n\nPhilosophy - when you hit dot on your keyboard - you should have everything you need.\n\nAlso - everything is statically typed!\n\n## Getting Started\n\n### Basic Example\nWhat we always do is\n1. Create an observable (A stream that is not running yet)\n2. Transform it things like `map` or `filter`\n3. Run it. \n   - For example, `to_list` will run the observable and collect the results into a list.\n   - `run_to_completion` will run the observable until it completes\n   - `to_file_appending` will run the observable and write the results to a file\n```python\nimport anyio\nfrom grugstream import Observable\n\n\n# Mock async function simulating an HTTP call to Google\nasync def mock_http_call_to_google(item: str) -> str:\n    # Simulate the asynchronous delay of an HTTP request\n    await anyio.sleep(1)\n    return f\"Response from Google {item}\"\n\n\nasync def main():\n    # Create an observable, and call google for each item\n    observable = (\n        Observable.from_iterable([\"one\", \"two\", \"three\"])\n        # this is the same as map, but it's for async functions\n        .map_async(lambda item: mock_http_call_to_google(item))\n    )\n\n    # Actually start the stream and collect the results into a list\n    results = await observable.to_list()\n\n    for response in results:\n        print(response)\n\n\nanyio.run(main)\n```\n\n### Map operators - Parallel Example\n\nRunning things in parallel is as simple as calling `map_async_par` instead of `map_async`:\n\n```python\nimport anyio\nfrom grugstream import Observable\n\n\n# Mock async function simulating an HTTP call to Google\nasync def mock_http_call_to_google(item: str) -> str:\n    # Simulate the asynchronous delay of an HTTP request\n    await anyio.sleep(1)\n    return f\"Response from Google {item}\"\n\n\nasync def main():\n    # Create an observable, and call google for each item\n    observable = (\n         # repeat every 0.1 seconds\n        Observable.from_repeat(\"one\", 0.1)\n        # at any given time, there will be at most 50 concurrent calls to google\n        .map_async_par(lambda item: mock_http_call_to_google(item), max_par=50)\n    )\n\n    # Actually start the stream - results into a list\n    # Let's take only 20 results\n    results = await observable.take(20).to_list()\n\n    for response in results:\n        print(response)\n\n\nanyio.run(main)\n```\n\n## Chaining other api calls\nSuppose you have multiple api calls to run.  \nYou  want all the api calls to be run in parallel with each other - \nthe items doing the 2nd api call don't need to wait for all the items for the 1st api call to complete.\nAnd maybe you want to stream to a file while it completes.\nThats when streaming really shines.\n```python\nimport random\nfrom pathlib import Path\nfrom typing import List, Optional\n\nimport anyio\n\nfrom grugstream import Observable\n\n\n# Mock async function simulating an HTTP call to Google\nasync def mock_http_call_to_google(item: str) -> str:\n    await anyio.sleep(1)\n    return f\"Google Response for {item}\"\n\n\n# Mock async function simulating an API call that returns a list of items\nasync def mock_api_call_that_returns_list(item: str) -> List[str]:\n    await anyio.sleep(0.5)\n    return [f\"Item {i} from {item}\" for i in range(3)]\n\n\n# Mock async function simulating an API call that returns an Optional value\nasync def mock_api_call_that_returns_optional(item: str) -> Optional[str]:\n    await anyio.sleep(0.2)\n    maybe_yes = random.choice([True, False])\n    return item if maybe_yes else None\n\n\nasync def main():\n    observable = (\n        Observable.from_repeat(\"query\", 0.1)\n        .map_async_par(lambda item: mock_http_call_to_google(item))\n        .map_async_par(lambda item: mock_api_call_that_returns_list(item))\n        .flatten_iterable()  # Flatten the list into individual items\n        .map_async_par(lambda item: mock_api_call_that_returns_optional(item))\n        .print()\n        .flatten_optional()  # Remove None values\n    )\n\n    # Write the results to a file\n    await observable.take(100).to_file(Path(\"results.txt\"))\n\n\nanyio.run(main)\n```\n\n## Logging and debugging, print and tqdm\nWe provide a `print` and `tqdm` operator to help you debug your streams.\n\n```python\nimport anyio\nfrom tqdm import tqdm\n\nfrom grugstream import Observable\n\n\n# Mock async function simulating an HTTP call to Google\nasync def mock_http_call_to_google(item: str) -> str:\n    await anyio.sleep(0.1)\n    return f\"Google Response for {item}\"\n\n\nasync def main():\n    observable = (\n        Observable.from_repeat(\"query\", 0.1)\n        .throttle(1)  # don't spam google too much!\n        .map_async_par(lambda item: mock_http_call_to_google(item))\n        # Show a progress bar that should show ~1 it/s\n        .tqdm(tqdm_bar=tqdm(desc=\"Google observable\"))\n        # Print the elements\n        .print()\n    )\n\n    await observable.take(1000).run_to_completion()\n\n\nanyio.run(main)\n```\n\n\n## for_each operator - side effects\nSometimes you want to do something with the elements of the stream, but you don't want to change the stream itself.\nFor example, you might want to write some intermediate items to a file.\n\n```python\nimport anyio\nfrom pathlib import Path\nfrom grugstream import Observable\n\n\n# Mock async function simulating an HTTP call to Google\nasync def mock_http_call_to_google(item: str) -> str:\n    await anyio.sleep(0.1)\n    return f\"Google Response for {item}\"\n\n\nasync def main():\n    my_list = []\n    observable = (\n        Observable.from_repeat(\"query\", 0.1)\n        .map_async_par(lambda item: mock_http_call_to_google(item))\n        # What's google's response? Let's write it to a file\n        .for_each_to_file(\n            file_path=Path(\"results.txt\"),\n        )\n        # Let's also append it to a list to print\n        .for_each(lambda item: my_list.append(item))\n        .map(lambda item: item.upper())\n        .print()\n    )\n\n    await observable.take(1000).run_to_completion()\n    print(my_list)\n\n\nanyio.run(main)\n\n```\n\n\n\n\n## Building an Observable\n\nThis library provides several utility methods for creating observables:\n\n### From Existing Data\n\n- `from_iterable(iterable)`: Create an observable from a Python iterable like a list or a tuple.\n- `from_async_iterable(iterable)`: Create an observable from an asynchronous iterable.\n- `from_one(value)`: Create an observable that emits a single value.\n- `from_one_option(value)`: Create an observable that emits a single value or nothing if the value is `None`.\n\nExample:\n\n```python\nfrom grugstream import Observable\nobservable = Observable.from_iterable([1, 2, 3])\n```\n\n\n## Transforming Observables\n\n### `map`\n\nApplies a function to all elements in the source observable.\n\n```python\nobservable = Observable.from_iterable([1, 2, 3])\nnew_observable = observable.map(lambda x: x * 2)\n```\n\n### `filter`\n\nFilters out elements that do not match a given predicate.\n\n```python\nobservable = Observable.from_iterable([1, 2, 3])\nfiltered_observable = observable.filter(lambda x: x > 1)\n```\n\n### `flatten_iterable`\n\nTransforms an observable of iterables into an observable of the individual items.\n\n```python\nobservable = Observable.from_iterable([[1, 2], [3, 4]])\nflattened_observable = observable.flatten_iterable()\n```\n\n## Back-pressure, Buffearing, Throttling\n\nThe library supports back-pressure to ensure that the producer and consumer are in sync. There are also methods like `throttle(seconds)` to control the rate of emissions.\n\n```python\nthrottled_observable = observable.throttle(1.0)  # Emits at most one item per second\n```\n\n## Subscription and Error Handling\n\nYou can use the `Subscriber` class to define custom subscribers. It has three methods:\n\n- `on_next(value)`: Called when a new value is emitted.\n- `on_error(error)`: Called when an error occurs.\n- `on_completed()`: Called when the observable completes.\n\nIn general, I hope that you wouldn't have to implement your own subscriber.\nMost things you want to do can be done by chaining operators such as `map` and `filter`.",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "The grugstream library provides an easy way to work with asynchronous/ reactive / streaming programming.",
    "version": "0.0.16",
    "project_urls": {
        "Homepage": "https://github.com/thejaminator/grugstream"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "b8c0ec1a4580ac913f8d1f29eedbbc961507079939afbe6cb5db6ae018b7c5d4",
                "md5": "5b0c3ee4e3d920d812af815b90a000ca",
                "sha256": "760c29fdefec581b935e92e98365c80554dc04f84f52eae26d40fd31e6139d56"
            },
            "downloads": -1,
            "filename": "grugstream-0.0.16-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "5b0c3ee4e3d920d812af815b90a000ca",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9,<4.0",
            "size": 16403,
            "upload_time": "2023-12-04T00:16:27",
            "upload_time_iso_8601": "2023-12-04T00:16:27.889617Z",
            "url": "https://files.pythonhosted.org/packages/b8/c0/ec1a4580ac913f8d1f29eedbbc961507079939afbe6cb5db6ae018b7c5d4/grugstream-0.0.16-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "38e6c9ae356490d9914aea9dcd4879fcc0ea920134b8cc104b8cc1421cb30570",
                "md5": "f5d281501b034d35811918f81e3db9d0",
                "sha256": "bc7a95900475c1368dc07cbb95c69483efe2ec7465a141f29655fab53bf44d10"
            },
            "downloads": -1,
            "filename": "grugstream-0.0.16.tar.gz",
            "has_sig": false,
            "md5_digest": "f5d281501b034d35811918f81e3db9d0",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9,<4.0",
            "size": 17824,
            "upload_time": "2023-12-04T00:16:29",
            "upload_time_iso_8601": "2023-12-04T00:16:29.692211Z",
            "url": "https://files.pythonhosted.org/packages/38/e6/c9ae356490d9914aea9dcd4879fcc0ea920134b8cc104b8cc1421cb30570/grugstream-0.0.16.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-12-04 00:16:29",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "thejaminator",
    "github_project": "grugstream",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [],
    "lcname": "grugstream"
}
        
Elapsed time: 0.13418s