aioreactive


Nameaioreactive JSON
Version 0.20.0 PyPI version JSON
download
home_pagehttps://github.com/dbrattli/aioreactive
Summarysync/await Reactive Tools for Python 3.10+
upload_time2024-09-28 08:27:04
maintainerNone
docs_urlNone
authorDag Brattli
requires_python<4,>=3.10
licenseMIT
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage
            
<img src="logo/logo.jpg" alt="drawing" width="200"/>

# aioreactive - ReactiveX for asyncio using async and await

[![PyPI](https://img.shields.io/pypi/v/aioreactive.svg)](https://pypi.python.org/pypi/aioreactive)
![Python package](https://github.com/dbrattli/aioreactive/workflows/Python%20package/badge.svg)
![Publish Package](https://github.com/dbrattli/aioreactive/actions/workflows/python-publish.yml/badge.svg)
[![codecov](https://codecov.io/gh/dbrattli/aioreactive/branch/master/graph/badge.svg)](https://codecov.io/gh/dbrattli/aioreactive)

> *NEWS: Project rebooted Nov. 2020. Rebuilt using [Expression](https://github.com/dbrattli/Expression).*

Aioreactive is [RxPY](https://github.com/ReactiveX/RxPY) for asyncio.
It's an asynchronous and reactive Python library for asyncio using async
and await. Aioreactive is built on the
[Expression](https://github.com/dbrattli/Expression) functional library
and, integrates naturally with the Python language.

> aioreactive is the unification of RxPY and reactive programming with
> asyncio using async and await.

## The design goals for aioreactive

* Python 3.10+ only. We have a hard dependency [Expression v5]([https://www.python.org/dev/peps/pep-0585/](https://github.com/dbrattli/Expression)).
* All operators and tools are implemented as plain old functions.
* Everything is `async`. Sending values is async, subscribing to
  observables is async. Disposing subscriptions is async.
* One scheduler to rule them all. Everything runs on the asyncio base
  event-loop.
* No multi-threading. Only async and await with concurrency using
  asyncio. Threads are hard, and in many cases it doesn’t make sense to
  use multi-threading in Python applications. If you need to use threads
  you may wrap them with
  [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)
  and compose them into the chain with `flat_map()` or similar. See
  [`parallel.py`](https://github.com/dbrattli/aioreactive/blob/master/examples/parallel/parallel.py)
  for an example.
* Simple, clean and use few abstractions. Try to align with the
  itertools package, and reuse as much from the Python standard library
  as possible.
* Support type hints and static type checking using [Pylance](https://devblogs.microsoft.com/python/announcing-pylance-fast-feature-rich-language-support-for-python-in-visual-studio-code/).
* Implicit synchronous back-pressure &trade;. Producers of events will
  simply be awaited until the event can be processed by the down-stream
  consumers.

## AsyncObservable and AsyncObserver

With aioreactive you subscribe observers to observables, and the key
abstractions of aioreactive can be seen in this single line of code:

```python
subscription = await observable.subscribe_async(observer)
```

The difference from RxPY can be seen with the `await` expression.
Aioreactive is built around the asynchronous duals, or opposites of the
AsyncIterable and AsyncIterator abstract base classes. These async
classes are called AsyncObservable and AsyncObserver.

AsyncObservable is a producer of events. It may be seen as the dual or
opposite of AsyncIterable and provides a single setter method called
`subscribe_async()` that is the dual of the `__aiter__()` getter method:

```python
from abc import ABC, abstractmethod

class AsyncObservable(ABC):
    @abstractmethod
    async def subscribe_async(self, observer):
        return NotImplemented
```

AsyncObserver is a consumer of events and is modeled after the
so-called [consumer interface](http://effbot.org/zone/consumer.htm), the
enhanced generator interface in
[PEP-342](https://www.python.org/dev/peps/pep-0342/) and async
generators in [PEP-525](https://www.python.org/dev/peps/pep-0525/). It
is the dual of the AsyncIterator `__anext__()` method, and expands to
three async methods `asend()`, that is the opposite of `__anext__()`,
`athrow()` that is the opposite of an `raise Exception()` and `aclose()`
that is the opposite of `raise StopAsyncIteration`:

```python
from abc import ABC, abstractmethod

class AsyncObserver(ABC):
    @abstractmethod
    async def asend(self, value):
        return NotImplemented

    @abstractmethod
    async def athrow(self, error):
        return NotImplemented

    @abstractmethod
    async def aclose(self):
        return NotImplemented
```

## Subscribing to observables

An observable becomes hot and starts streaming items by using the
`subscribe_async()` method. The `subscribe_async()` method takes an
observable and returns a disposable subscription. So the
`subscribe_async()` method is used to attach a observer to the
observable.

```python
async def asend(value):
    print(value)

disposable = await subscribe_async(source, AsyncAnonymousObserver(asend))
```

`AsyncAnonymousObserver` is an anonymous observer that constructs an
`AsyncObserver` out of plain async functions, so you don't have to
implement a new named observer every time you need one.

The subscription returned by `subscribe_async()` is disposable, so to
unsubscribe you need to await the `dispose_async()` method on the
subscription.

```python
await subscription.dispose_async()
```

## Asynchronous iteration

Even more interesting, with `to_async_iterable` you can flip around from
`AsyncObservable` to an `AsyncIterable` and use `async-for` to consume
the stream of events.

```python
import aioreactive as rx

xs = rx.from_iterable([1, 2, 3])
async for x in xs:
    print(x)
```

They effectively transform us from an async push model to an async pull
model, and lets us use the awesome new language features such as `async
for` and `async-with`. We do this without any queueing, as a push by the
`AsyncObservable` will await the pull by the `AsyncIterator.  This
effectively applies so-called "back-pressure" up the subscription as the
producer will await the iterator to pick up the item send.

The for-loop may be wrapped with async-with to control the lifetime of
the subscription:

```python
import aioreactive as rx

xs = rx.from_iterable([1, 2, 3])
result = []

obv = rx.AsyncIteratorObserver(xs)
async with await xs.subscribe_async(obv) as subscription:
    async for x in obv:
        result.append(x)

assert result == [1, 2, 3]
```

## Async streams

An async stream is both an async observer and an async observable.
Aioreactive lets you create streams explicitly.

```python
import aioreactive as rx

stream = AsyncSubject()  # Alias for AsyncMultiStream

sink = rx.AsyncAnonymousObserver()
await stream.subscribe_async(sink)
await stream.asend(42)
```

You can create streams directly from `AsyncMultiStream` or
`AsyncSingleStream`. `AsyncMultiStream` supports multiple observers, and
is hot in the sense that it will drop any event that is sent if there
are currently no observers attached. `AsyncSingleStream` on the other
hand supports a single observer, and is cold in the sense that it will
await any producer until there is an observer attached.

## Operators

The Rx operators in aioreactive are all plain old functions. You can
apply them to an observable and compose it into a transformed, filtered,
aggregated or combined observable. This transformed observable can be
streamed into an observer.

    Observable -> Operator -> Operator -> Operator -> Observer

Aioreactive contains many of the same operators as you know from RxPY.
Our goal is not to implement them all, but to provide the most essential
ones.

* **concat** -- Concatenates two or more observables.
* **choose** -- Filters and/or transforms the observable.
* **choose_asnc** -- Asynchronously filters and/or transforms the observable.
* **debounce** -- Throttles an observable.
* **delay** -- delays the items within an observable.
* **distinct_until_changed** -- an observable with continuously distinct values.
* **filter** -- filters an observable.
* **filteri** -- filters an observable with index.
* **flat_map** -- transforms an observable into a stream of observables and flattens the resulting observable.
* **flat_map_latest** -- transforms an observable into a stream of
  observables and flattens the resulting observable by producing values
  from the latest observable.
* **from_iterable** -- Create an observable from an (async) iterable.
* **subscribe** -- Subscribes an observer to an observable. Returns a subscription.
* **map** -- transforms an observable.
* **mapi** -- transforms an observable with index.
* **map_async** -- transforms an observable asynchronously.
* **mapi_async** -- transforms an observable asynchronously with index.
* **merge_inner** -- Merges an observable of observables.
* **merge** -- Merge one observable with another observable.
* **merge_seq** -- Merge a sequence of observables.
* **run** -- Awaits the future returned by subscribe. Returns when the subscription closes.
* **slice** -- Slices an observable.
* **skip** -- Skip items from the start of the observable stream.
* **skip_last** -- Skip items from the end of the observable stream.
* **starfilter** -- Filters an observable with a predicate and spreads the arguments.
* **starmap** -- Transforms and async observable and spreads the arguments to the mapper.
* **switch_latest** -- Merges the latest stream in an observable of streams.
* **take** -- Take a number of items from the start of the observable stream.
* **take_last** -- Take a number of items from the end of the observable stream.
* **unit** -- Converts a value or future to an observable.
* **with_latest_from** -- Combines two observables into one.

# Functional or object-oriented, reactive or interactive

With aioreactive you can choose to program functionally with plain old
functions, or object-oriented with classes and methods. Aioreactive
supports both method chaining or forward pipe programming styles.

## Pipe forward programming style

`AsyncObservable` may compose operators using forward pipelining with
the `pipe` operator provided by the amazing
[Expression](https://github.com/dbrattli/Expression) library. This works
by having the operators partially applied with their arguments before
being given the source stream as the last curried argument.

```python
ys = pipe(xs, filter(predicate), map(mapper), flat_map(request))
```

Longer pipelines may break lines as for binary operators:

```python
import aioreactve as rx

async def main():
    stream = rx.AsyncSubject()
    obv = rx.AsyncIteratorObserver()

    xs = pipe(
        stream,
        rx.map(lambda x: x["term"]),
        rx.filter(lambda text: len(text) > 2),
        rx.debounce(0.75),
        rx.distinct_until_changed(),
        rx.map(search_wikipedia),
        rx.switch_latest(),
    )

    async with xs.subscribe_async(obv) as ys
        async for value in obv:
            print(value)
```

AsyncObservable also supports slicing using the Python slice notation.

```python
@pytest.mark.asyncio
async def test_slice_special():
    xs = rx.from_iterable([1, 2, 3, 4, 5])
    values = []

    async def asend(value):
        values.append(value)

    ys = xs[1:-1]

    result = await run(ys, AsyncAnonymousObserver(asend))

    assert result == 4
    assert values == [2, 3, 4]
```

# Fluent and chained programming style

An alternative to pipelining is to use the classic and fluent method
chaining as we know from [ReactiveX](http://reactivex.io).

An `AsyncObservable` created from class methods such as
`AsyncRx.from_iterable()` returns a `AsyncChainedObservable`.
where we may use methods such as `.filter()` and `.map()`.

```python
from aioreactive import AsyncRx

@pytest.mark.asyncio
async def test_observable_simple_pipe():
    xs = AsyncRx.from_iterable([1, 2, 3])
    result = []

    async def mapper(value):
        await asyncio.sleep(0.1)
        return value * 10

    async def predicate(value):
        await asyncio.sleep(0.1)
        return value > 1

    ys = xs.filter(predicate).map(mapper)

    async def on_next(value):
        result.append(value)

    subscription = await ys.subscribe_async(AsyncAnonymousObserver(on_next))
    await subsubscription
    assert result == [20, 30]
```

# Virtual time testing

Aioreactive also provides a virtual time event loop
(`VirtualTimeEventLoop`) that enables you to write asyncio unit-tests
that run in virtual time. Virtual time means that time is emulated, so
tests run as quickly as possible even if they sleep or awaits long-lived
operations. A test using virtual time still gives the same result as it
would have done if it had been run in real-time.

For example the following test still gives the correct result even if it
takes 0 seconds to run:

```python
@pytest.fixture()
def event_loop():
    loop = VirtualTimeEventLoop()
    yield loop
    loop.close()

@pytest.mark.asyncio
async def test_call_later():
    result = []

    def action(value):
        result.append(value)

    loop = asyncio.get_event_loop()
    loop.call_later(10, partial(action, 1))
    loop.call_later(1, partial(action, 2))
    loop.call_later(5, partial(action, 3))
    await asyncio.sleep(10)
    assert result == [2, 3, 1]
```

The aioreactive testing module provides a test `AsyncSubject` that may
delay sending values, and a test `AsyncTestObserver` that records all
events. These two classes helps you with testing in virtual time.

```python
@pytest.fixture()
def event_loop():
    loop = VirtualTimeEventLoop()
    yield loop
    loop.close()

@pytest.mark.asyncio
async def test_delay_done():
    xs = AsyncTestSubject()  # Test stream

    ys = pipe(xs, rx.delay(1.0))
    obv = AsyncTestObserver()  # Test AsyncAnonymousObserver
    async with await ys.subscribe_async(obv):
        await xs.asend_later(0, 10)
        await xs.asend_later(1.0, 20)
        await xs.aclose_later(1.0)
        await obv

    assert obv.values == [
        (ca(1), OnNext(10)),
        (ca(2), OnNext(20)),
        (ca(3), OnCompleted()),
    ]
```

# Why not use AsyncIterable for everything?

`AsyncIterable` and `AsyncObservable` are closely related (in fact they
are duals). `AsyncIterable` is an async iterable (pull) world, while
`AsyncObservable` is an async reactive (push) based world. There are
many operations such as `map()` and `filter()` that may be simpler to
implement using `AsyncIterable`, but once we start to include time, then
`AsyncObservable` really starts to shine. Operators such as `delay()`
makes much more sense for `AsyncObservable` than for `AsyncIterable`.

However, aioreactive makes it easy for you to flip-around to async
iterable just before you need to consume the stream, thus giving you the
best of both worlds.

# Will aioreactive replace RxPY?

Aioreactive will not replace [RxPY](https://github.com/ReactiveX/RxPY).
RxPY is an implementation of `Observable`. Aioreactive is an
implementation of `AsyncObservable`.

Rx and RxPY has hundreds of different query operators, and we currently
have no plans to implementing all of them for aioreactive.

Many ideas from aioreactive have already been ported back into "classic" RxPY.

# References

Aioreactive was inspired by:

* [AsyncRx](https://github.com/dbrattli/asyncrx) - Aioreactive is a direct port of AsyncRx from F#.
* [Expression](https://github.com/dbrattli/Expression) - Functional programming for Python.
* [Is it really Pythonic to continue using LINQ operators instead of plain old functions?](https://github.com/ReactiveX/RxPY/issues/94)
* [Reactive Extensions (Rx)](http://reactivex.io) and [RxPY](https://github.com/ReactiveX/RxPY).
* [Dart Streams](https://www.dartlang.org/tutorials/language/streams)
* [Underscore.js](http://underscorejs.org).
* [itertools](https://docs.python.org/3/library/itertools.html) and [functools](https://docs.python.org/3/library/functools.html).
* [dbrattli/OSlash](https://github.com/dbrattli/OSlash)
* [kriskowal/q](https://github.com/kriskowal/q).

# License

The MIT License (MIT)
Copyright (c) 2016 Børge Lanes, Dag Brattli.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/dbrattli/aioreactive",
    "name": "aioreactive",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4,>=3.10",
    "maintainer_email": null,
    "keywords": null,
    "author": "Dag Brattli",
    "author_email": "dag@brattli.net",
    "download_url": "https://files.pythonhosted.org/packages/ba/f1/b191aa532f0724b0803f5b978cafd85a23b6d81899d2c2884ad2fd8d08e4/aioreactive-0.20.0.tar.gz",
    "platform": null,
    "description": "\n<img src=\"logo/logo.jpg\" alt=\"drawing\" width=\"200\"/>\n\n# aioreactive - ReactiveX for asyncio using async and await\n\n[![PyPI](https://img.shields.io/pypi/v/aioreactive.svg)](https://pypi.python.org/pypi/aioreactive)\n![Python package](https://github.com/dbrattli/aioreactive/workflows/Python%20package/badge.svg)\n![Publish Package](https://github.com/dbrattli/aioreactive/actions/workflows/python-publish.yml/badge.svg)\n[![codecov](https://codecov.io/gh/dbrattli/aioreactive/branch/master/graph/badge.svg)](https://codecov.io/gh/dbrattli/aioreactive)\n\n> *NEWS: Project rebooted Nov. 2020. Rebuilt using [Expression](https://github.com/dbrattli/Expression).*\n\nAioreactive is [RxPY](https://github.com/ReactiveX/RxPY) for asyncio.\nIt's an asynchronous and reactive Python library for asyncio using async\nand await. Aioreactive is built on the\n[Expression](https://github.com/dbrattli/Expression) functional library\nand, integrates naturally with the Python language.\n\n> aioreactive is the unification of RxPY and reactive programming with\n> asyncio using async and await.\n\n## The design goals for aioreactive\n\n* Python 3.10+ only. We have a hard dependency [Expression v5]([https://www.python.org/dev/peps/pep-0585/](https://github.com/dbrattli/Expression)).\n* All operators and tools are implemented as plain old functions.\n* Everything is `async`. Sending values is async, subscribing to\n  observables is async. Disposing subscriptions is async.\n* One scheduler to rule them all. Everything runs on the asyncio base\n  event-loop.\n* No multi-threading. Only async and await with concurrency using\n  asyncio. Threads are hard, and in many cases it doesn\u2019t make sense to\n  use multi-threading in Python applications. If you need to use threads\n  you may wrap them with\n  [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\n  and compose them into the chain with `flat_map()` or similar. See\n  [`parallel.py`](https://github.com/dbrattli/aioreactive/blob/master/examples/parallel/parallel.py)\n  for an example.\n* Simple, clean and use few abstractions. Try to align with the\n  itertools package, and reuse as much from the Python standard library\n  as possible.\n* Support type hints and static type checking using [Pylance](https://devblogs.microsoft.com/python/announcing-pylance-fast-feature-rich-language-support-for-python-in-visual-studio-code/).\n* Implicit synchronous back-pressure &trade;. Producers of events will\n  simply be awaited until the event can be processed by the down-stream\n  consumers.\n\n## AsyncObservable and AsyncObserver\n\nWith aioreactive you subscribe observers to observables, and the key\nabstractions of aioreactive can be seen in this single line of code:\n\n```python\nsubscription = await observable.subscribe_async(observer)\n```\n\nThe difference from RxPY can be seen with the `await` expression.\nAioreactive is built around the asynchronous duals, or opposites of the\nAsyncIterable and AsyncIterator abstract base classes. These async\nclasses are called AsyncObservable and AsyncObserver.\n\nAsyncObservable is a producer of events. It may be seen as the dual or\nopposite of AsyncIterable and provides a single setter method called\n`subscribe_async()` that is the dual of the `__aiter__()` getter method:\n\n```python\nfrom abc import ABC, abstractmethod\n\nclass AsyncObservable(ABC):\n    @abstractmethod\n    async def subscribe_async(self, observer):\n        return NotImplemented\n```\n\nAsyncObserver is a consumer of events and is modeled after the\nso-called [consumer interface](http://effbot.org/zone/consumer.htm), the\nenhanced generator interface in\n[PEP-342](https://www.python.org/dev/peps/pep-0342/) and async\ngenerators in [PEP-525](https://www.python.org/dev/peps/pep-0525/). It\nis the dual of the AsyncIterator `__anext__()` method, and expands to\nthree async methods `asend()`, that is the opposite of `__anext__()`,\n`athrow()` that is the opposite of an `raise Exception()` and `aclose()`\nthat is the opposite of `raise StopAsyncIteration`:\n\n```python\nfrom abc import ABC, abstractmethod\n\nclass AsyncObserver(ABC):\n    @abstractmethod\n    async def asend(self, value):\n        return NotImplemented\n\n    @abstractmethod\n    async def athrow(self, error):\n        return NotImplemented\n\n    @abstractmethod\n    async def aclose(self):\n        return NotImplemented\n```\n\n## Subscribing to observables\n\nAn observable becomes hot and starts streaming items by using the\n`subscribe_async()` method. The `subscribe_async()` method takes an\nobservable and returns a disposable subscription. So the\n`subscribe_async()` method is used to attach a observer to the\nobservable.\n\n```python\nasync def asend(value):\n    print(value)\n\ndisposable = await subscribe_async(source, AsyncAnonymousObserver(asend))\n```\n\n`AsyncAnonymousObserver` is an anonymous observer that constructs an\n`AsyncObserver` out of plain async functions, so you don't have to\nimplement a new named observer every time you need one.\n\nThe subscription returned by `subscribe_async()` is disposable, so to\nunsubscribe you need to await the `dispose_async()` method on the\nsubscription.\n\n```python\nawait subscription.dispose_async()\n```\n\n## Asynchronous iteration\n\nEven more interesting, with `to_async_iterable` you can flip around from\n`AsyncObservable` to an `AsyncIterable` and use `async-for` to consume\nthe stream of events.\n\n```python\nimport aioreactive as rx\n\nxs = rx.from_iterable([1, 2, 3])\nasync for x in xs:\n    print(x)\n```\n\nThey effectively transform us from an async push model to an async pull\nmodel, and lets us use the awesome new language features such as `async\nfor` and `async-with`. We do this without any queueing, as a push by the\n`AsyncObservable` will await the pull by the `AsyncIterator.  This\neffectively applies so-called \"back-pressure\" up the subscription as the\nproducer will await the iterator to pick up the item send.\n\nThe for-loop may be wrapped with async-with to control the lifetime of\nthe subscription:\n\n```python\nimport aioreactive as rx\n\nxs = rx.from_iterable([1, 2, 3])\nresult = []\n\nobv = rx.AsyncIteratorObserver(xs)\nasync with await xs.subscribe_async(obv) as subscription:\n    async for x in obv:\n        result.append(x)\n\nassert result == [1, 2, 3]\n```\n\n## Async streams\n\nAn async stream is both an async observer and an async observable.\nAioreactive lets you create streams explicitly.\n\n```python\nimport aioreactive as rx\n\nstream = AsyncSubject()  # Alias for AsyncMultiStream\n\nsink = rx.AsyncAnonymousObserver()\nawait stream.subscribe_async(sink)\nawait stream.asend(42)\n```\n\nYou can create streams directly from `AsyncMultiStream` or\n`AsyncSingleStream`. `AsyncMultiStream` supports multiple observers, and\nis hot in the sense that it will drop any event that is sent if there\nare currently no observers attached. `AsyncSingleStream` on the other\nhand supports a single observer, and is cold in the sense that it will\nawait any producer until there is an observer attached.\n\n## Operators\n\nThe Rx operators in aioreactive are all plain old functions. You can\napply them to an observable and compose it into a transformed, filtered,\naggregated or combined observable. This transformed observable can be\nstreamed into an observer.\n\n    Observable -> Operator -> Operator -> Operator -> Observer\n\nAioreactive contains many of the same operators as you know from RxPY.\nOur goal is not to implement them all, but to provide the most essential\nones.\n\n* **concat** -- Concatenates two or more observables.\n* **choose** -- Filters and/or transforms the observable.\n* **choose_asnc** -- Asynchronously filters and/or transforms the observable.\n* **debounce** -- Throttles an observable.\n* **delay** -- delays the items within an observable.\n* **distinct_until_changed** -- an observable with continuously distinct values.\n* **filter** -- filters an observable.\n* **filteri** -- filters an observable with index.\n* **flat_map** -- transforms an observable into a stream of observables and flattens the resulting observable.\n* **flat_map_latest** -- transforms an observable into a stream of\n  observables and flattens the resulting observable by producing values\n  from the latest observable.\n* **from_iterable** -- Create an observable from an (async) iterable.\n* **subscribe** -- Subscribes an observer to an observable. Returns a subscription.\n* **map** -- transforms an observable.\n* **mapi** -- transforms an observable with index.\n* **map_async** -- transforms an observable asynchronously.\n* **mapi_async** -- transforms an observable asynchronously with index.\n* **merge_inner** -- Merges an observable of observables.\n* **merge** -- Merge one observable with another observable.\n* **merge_seq** -- Merge a sequence of observables.\n* **run** -- Awaits the future returned by subscribe. Returns when the subscription closes.\n* **slice** -- Slices an observable.\n* **skip** -- Skip items from the start of the observable stream.\n* **skip_last** -- Skip items from the end of the observable stream.\n* **starfilter** -- Filters an observable with a predicate and spreads the arguments.\n* **starmap** -- Transforms and async observable and spreads the arguments to the mapper.\n* **switch_latest** -- Merges the latest stream in an observable of streams.\n* **take** -- Take a number of items from the start of the observable stream.\n* **take_last** -- Take a number of items from the end of the observable stream.\n* **unit** -- Converts a value or future to an observable.\n* **with_latest_from** -- Combines two observables into one.\n\n# Functional or object-oriented, reactive or interactive\n\nWith aioreactive you can choose to program functionally with plain old\nfunctions, or object-oriented with classes and methods. Aioreactive\nsupports both method chaining or forward pipe programming styles.\n\n## Pipe forward programming style\n\n`AsyncObservable` may compose operators using forward pipelining with\nthe `pipe` operator provided by the amazing\n[Expression](https://github.com/dbrattli/Expression) library. This works\nby having the operators partially applied with their arguments before\nbeing given the source stream as the last curried argument.\n\n```python\nys = pipe(xs, filter(predicate), map(mapper), flat_map(request))\n```\n\nLonger pipelines may break lines as for binary operators:\n\n```python\nimport aioreactve as rx\n\nasync def main():\n    stream = rx.AsyncSubject()\n    obv = rx.AsyncIteratorObserver()\n\n    xs = pipe(\n        stream,\n        rx.map(lambda x: x[\"term\"]),\n        rx.filter(lambda text: len(text) > 2),\n        rx.debounce(0.75),\n        rx.distinct_until_changed(),\n        rx.map(search_wikipedia),\n        rx.switch_latest(),\n    )\n\n    async with xs.subscribe_async(obv) as ys\n        async for value in obv:\n            print(value)\n```\n\nAsyncObservable also supports slicing using the Python slice notation.\n\n```python\n@pytest.mark.asyncio\nasync def test_slice_special():\n    xs = rx.from_iterable([1, 2, 3, 4, 5])\n    values = []\n\n    async def asend(value):\n        values.append(value)\n\n    ys = xs[1:-1]\n\n    result = await run(ys, AsyncAnonymousObserver(asend))\n\n    assert result == 4\n    assert values == [2, 3, 4]\n```\n\n# Fluent and chained programming style\n\nAn alternative to pipelining is to use the classic and fluent method\nchaining as we know from [ReactiveX](http://reactivex.io).\n\nAn `AsyncObservable` created from class methods such as\n`AsyncRx.from_iterable()` returns a `AsyncChainedObservable`.\nwhere we may use methods such as `.filter()` and `.map()`.\n\n```python\nfrom aioreactive import AsyncRx\n\n@pytest.mark.asyncio\nasync def test_observable_simple_pipe():\n    xs = AsyncRx.from_iterable([1, 2, 3])\n    result = []\n\n    async def mapper(value):\n        await asyncio.sleep(0.1)\n        return value * 10\n\n    async def predicate(value):\n        await asyncio.sleep(0.1)\n        return value > 1\n\n    ys = xs.filter(predicate).map(mapper)\n\n    async def on_next(value):\n        result.append(value)\n\n    subscription = await ys.subscribe_async(AsyncAnonymousObserver(on_next))\n    await subsubscription\n    assert result == [20, 30]\n```\n\n# Virtual time testing\n\nAioreactive also provides a virtual time event loop\n(`VirtualTimeEventLoop`) that enables you to write asyncio unit-tests\nthat run in virtual time. Virtual time means that time is emulated, so\ntests run as quickly as possible even if they sleep or awaits long-lived\noperations. A test using virtual time still gives the same result as it\nwould have done if it had been run in real-time.\n\nFor example the following test still gives the correct result even if it\ntakes 0 seconds to run:\n\n```python\n@pytest.fixture()\ndef event_loop():\n    loop = VirtualTimeEventLoop()\n    yield loop\n    loop.close()\n\n@pytest.mark.asyncio\nasync def test_call_later():\n    result = []\n\n    def action(value):\n        result.append(value)\n\n    loop = asyncio.get_event_loop()\n    loop.call_later(10, partial(action, 1))\n    loop.call_later(1, partial(action, 2))\n    loop.call_later(5, partial(action, 3))\n    await asyncio.sleep(10)\n    assert result == [2, 3, 1]\n```\n\nThe aioreactive testing module provides a test `AsyncSubject` that may\ndelay sending values, and a test `AsyncTestObserver` that records all\nevents. These two classes helps you with testing in virtual time.\n\n```python\n@pytest.fixture()\ndef event_loop():\n    loop = VirtualTimeEventLoop()\n    yield loop\n    loop.close()\n\n@pytest.mark.asyncio\nasync def test_delay_done():\n    xs = AsyncTestSubject()  # Test stream\n\n    ys = pipe(xs, rx.delay(1.0))\n    obv = AsyncTestObserver()  # Test AsyncAnonymousObserver\n    async with await ys.subscribe_async(obv):\n        await xs.asend_later(0, 10)\n        await xs.asend_later(1.0, 20)\n        await xs.aclose_later(1.0)\n        await obv\n\n    assert obv.values == [\n        (ca(1), OnNext(10)),\n        (ca(2), OnNext(20)),\n        (ca(3), OnCompleted()),\n    ]\n```\n\n# Why not use AsyncIterable for everything?\n\n`AsyncIterable` and `AsyncObservable` are closely related (in fact they\nare duals). `AsyncIterable` is an async iterable (pull) world, while\n`AsyncObservable` is an async reactive (push) based world. There are\nmany operations such as `map()` and `filter()` that may be simpler to\nimplement using `AsyncIterable`, but once we start to include time, then\n`AsyncObservable` really starts to shine. Operators such as `delay()`\nmakes much more sense for `AsyncObservable` than for `AsyncIterable`.\n\nHowever, aioreactive makes it easy for you to flip-around to async\niterable just before you need to consume the stream, thus giving you the\nbest of both worlds.\n\n# Will aioreactive replace RxPY?\n\nAioreactive will not replace [RxPY](https://github.com/ReactiveX/RxPY).\nRxPY is an implementation of `Observable`. Aioreactive is an\nimplementation of `AsyncObservable`.\n\nRx and RxPY has hundreds of different query operators, and we currently\nhave no plans to implementing all of them for aioreactive.\n\nMany ideas from aioreactive have already been ported back into \"classic\" RxPY.\n\n# References\n\nAioreactive was inspired by:\n\n* [AsyncRx](https://github.com/dbrattli/asyncrx) - Aioreactive is a direct port of AsyncRx from F#.\n* [Expression](https://github.com/dbrattli/Expression) - Functional programming for Python.\n* [Is it really Pythonic to continue using LINQ operators instead of plain old functions?](https://github.com/ReactiveX/RxPY/issues/94)\n* [Reactive Extensions (Rx)](http://reactivex.io) and [RxPY](https://github.com/ReactiveX/RxPY).\n* [Dart Streams](https://www.dartlang.org/tutorials/language/streams)\n* [Underscore.js](http://underscorejs.org).\n* [itertools](https://docs.python.org/3/library/itertools.html) and [functools](https://docs.python.org/3/library/functools.html).\n* [dbrattli/OSlash](https://github.com/dbrattli/OSlash)\n* [kriskowal/q](https://github.com/kriskowal/q).\n\n# License\n\nThe MIT License (MIT)\nCopyright (c) 2016 B\u00f8rge Lanes, Dag Brattli.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "sync/await Reactive Tools for Python 3.10+",
    "version": "0.20.0",
    "project_urls": {
        "Homepage": "https://github.com/dbrattli/aioreactive",
        "Repository": "https://github.com/dbrattli/aioreactive"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "61a41c509efdaf51a29cc4bc74d0c0b6eff8f812f38721486c9bdfad465729bd",
                "md5": "0b164f45dccf689f49747b725217516b",
                "sha256": "8887a6cd5ff8494a953b580e22940dc52d265b2555f4c652afefb14d21ac64f9"
            },
            "downloads": -1,
            "filename": "aioreactive-0.20.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "0b164f45dccf689f49747b725217516b",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4,>=3.10",
            "size": 39739,
            "upload_time": "2024-09-28T08:27:02",
            "upload_time_iso_8601": "2024-09-28T08:27:02.486707Z",
            "url": "https://files.pythonhosted.org/packages/61/a4/1c509efdaf51a29cc4bc74d0c0b6eff8f812f38721486c9bdfad465729bd/aioreactive-0.20.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "baf1b191aa532f0724b0803f5b978cafd85a23b6d81899d2c2884ad2fd8d08e4",
                "md5": "42b4b07fd28ee02f196cd2ecb461a58d",
                "sha256": "2da08e5a7a722350fab082ee1426720b4e650adbcac4ff3839e29e676741d70d"
            },
            "downloads": -1,
            "filename": "aioreactive-0.20.0.tar.gz",
            "has_sig": false,
            "md5_digest": "42b4b07fd28ee02f196cd2ecb461a58d",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4,>=3.10",
            "size": 35512,
            "upload_time": "2024-09-28T08:27:04",
            "upload_time_iso_8601": "2024-09-28T08:27:04.064057Z",
            "url": "https://files.pythonhosted.org/packages/ba/f1/b191aa532f0724b0803f5b978cafd85a23b6d81899d2c2884ad2fd8d08e4/aioreactive-0.20.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-28 08:27:04",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "dbrattli",
    "github_project": "aioreactive",
    "travis_ci": false,
    "coveralls": true,
    "github_actions": true,
    "lcname": "aioreactive"
}
        
Elapsed time: 0.90840s