cashews


Namecashews JSON
Version 7.4.0 PyPI version JSON
download
home_pagehttps://github.com/Krukov/cashews/
Summarycache tools with async power
upload_time2024-11-16 22:13:46
maintainerNone
docs_urlNone
authorDmitry Kryukov
requires_python>=3.9
licenseMIT
keywords cache aio async multicache aiocache
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage
            <h1 align="center">🥔 CASHEWS 🥔</h1>

<p align="center">
    <em>Async cache framework with simple API to build fast and reliable applications</em>
</p>

```bash
pip install cashews
pip install cashews[redis]
pip install cashews[diskcache]
pip install cashews[dill] # can cache in redis more types of objects
pip install cashews[speedup] # for bloom filters
```

---

## Why

Cache plays a significant role in modern applications and everybody wants to use all the power of async programming and cache.
There are a few advanced techniques with cache and async programming that can help you build simple, fast,
scalable and reliable applications. This library intends to make it easy to implement such techniques.

## Features

- Easy to configure and use
- Decorator-based API, decorate and play
- Different cache strategies out-of-the-box
- Support for multiple storage backends ([In-memory](#in-memory), [Redis](#redis), [DiskCache](diskcache))
- Set TTL as a string ("2h5m"), as `timedelta` or use a function in case TTL depends on key parameters
- Transactionality
- Middlewares
- Client-side cache (10x faster than simple cache with redis)
- Bloom filters
- Different cache invalidation techniques (time-based or tags)
- Cache any objects securely with pickle (use [secret](#redis))
- 2x faster than `aiocache` (with client side caching)

## Usage Example

```python
from cashews import cache

cache.setup("mem://")  # configure as in-memory cache, but redis/diskcache is also supported

# use a decorator-based API
@cache(ttl="3h", key="user:{request.user.uid}")
async def long_running_function(request):
    ...

# or for fine-grained control, use it directly in a function
async def cache_using_function(request):
    await cache.set(key=request.user.uid, value=request.user, expire="20h")
    ...
```

More examples [here](https://github.com/Krukov/cashews/tree/master/examples)

## Table of Contents

- [Configuration](#configuration)
- [Available Backends](#available-backends)
- [Basic API](#basic-api)
- [Disable Cache](#disable-cache)
- [Strategies](#strategies)
  - [Cache condition](#cache-condition)
  - [Keys templating](#template-keys)
  - [TTL](#ttl)
  - [What can be cached](#what-can-be-cached)
- [Cache Invalidation](#cache-invalidation)
  - [Cache invalidation on code change](#cache-invalidation-on-code-change)
- [Detect the source of a result](#detect-the-source-of-a-result)
- [Middleware](#middleware)
- [Callbacks](#callbacks)
- [Transactional mode](#transactional)
- [Contrib](#contrib)
  - [Fastapi](#fastapi)
  - [Prometheus](#prometheus)

### Configuration

`cashews` provides a default cache, that you can setup in two different ways:

```python
from cashews import cache

# via url
cache.setup("redis://0.0.0.0/?db=1&socket_connect_timeout=0.5&suppress=0&secret=my_secret&enable=1")
# or via kwargs
cache.setup("redis://0.0.0.0/", db=1, wait_for_connection_timeout=0.5, suppress=False, secret=b"my_key", enable=True)
```

Alternatively, you can create a cache instance yourself:

```python
from cashews import Cache

cache = Cache()
cache.setup(...)
```

Optionally, you can disable cache with `disable`/`enable` parameter (see [Disable Cache](#disable-cache)):

```python
cache.setup("redis://redis/0?enable=1")
cache.setup("mem://?size=500", disable=True)
cache.setup("mem://?size=500", enable=False)
```

You can setup different Backends based on a prefix:

```python
cache.setup("redis://redis/0")
cache.setup("mem://?size=500", prefix="user")

await cache.get("accounts")  # will use the redis backend
await cache.get("user:1")  # will use the memory backend
```

### Available Backends

#### In-memory

The in-memory cache uses fixed-sized LRU dict to store values. It checks expiration on `get`
and periodically purge expired keys.

```python
cache.setup("mem://")
cache.setup("mem://?check_interval=10&size=10000")
```

#### Redis

_Requires [redis](https://github.com/redis/redis-py) package._\

This will use Redis as a storage.

This backend uses [pickle](https://docs.python.org/3/library/pickle.html) module to serialize
values, but the cashes can store values with sha1-keyed hash.

Use `secret` and `digestmod` parameters to protect your application from security vulnerabilities.

The `digestmod` is a hashing algorithm that can be used: `sum`, `md5` (default), `sha1` and `sha256`

The `secret` is a salt for a hash.

Pickle can't serialize any type of object. In case you need to store more complex types

you can use [dill](https://github.com/uqfoundation/dill) - set `pickle_type="dill"`.
Dill is great, but less performance.
If you need complex serializer for [sqlalchemy](https://docs.sqlalchemy.org/en/14/core/serializer.html) objects you can set `pickle_type="sqlalchemy"`
Use `json` also an option to serialize/deserialize an object, but it very limited (`pickle_type="json"`)

Any connection errors are suppressed, to disable it use `suppress=False` - a `CacheBackendInteractionError` will be raised

If you would like to use [client-side cache](https://redis.io/topics/client-side-caching) set `client_side=True`

Client side cache will add `cashews:` prefix for each key, to customize it use `client_side_prefix` option.

```python
cache.setup("redis://0.0.0.0/?db=1&minsize=10&suppress=false&secret=my_secret", prefix="func")
cache.setup("redis://0.0.0.0/2", password="my_pass", socket_connect_timeout=0.1, retry_on_timeout=True, secret="my_secret")
cache.setup("redis://0.0.0.0", client_side=True, client_side_prefix="my_prefix:", pickle_type="dill")
```

For using secure connections to redis (over ssl) uri should have `rediss` as schema

```python
cache.setup("rediss://0.0.0.0/", ssl_ca_certs="path/to/ca.crt", ssl_keyfile="path/to/client.key",ssl_certfile="path/to/client.crt",)
```

#### DiskCache

_Requires [diskcache](https://github.com/grantjenks/python-diskcache) package._

This will use local sqlite databases (with shards) as storage.

It is a good choice if you don't want to use redis, but you need a shared storage, or your cache takes a lot of local memory.
Also, it is a good choice for client side local storage.

You can setup disk cache with [FanoutCache parameters](http://www.grantjenks.com/docs/diskcache/api.html#fanoutcache)

** Warning ** `cache.scan` and `cache.get_match` does not work with this storage (works only if shards are disabled)

```python
cache.setup("disk://")
cache.setup("disk://?directory=/tmp/cache&timeout=1&shards=0")  # disable shards
Gb = 1073741824
cache.setup("disk://", size_limit=3 * Gb, shards=12)
```

### Basic API

There are a few basic methods to work with cache:

```python
from cashews import cache

cache.setup("mem://")  # configure as in-memory cache

await cache.set(key="key", value=90, expire="2h", exist=None)  # -> bool
await cache.set_raw(key="key", value="str")  # -> bool
await cache.set_many({"key1": value, "key2": value})  # -> None

await cache.get("key", default=None)  # -> Any
await cache.get_or_set("key", default=awaitable_or_callable, expire="1h")  # -> Any
await cache.get_raw("key") # -> Any
await cache.get_many("key1", "key2", default=None)  # -> tuple[Any]
async for key, value in cache.get_match("pattern:*", batch_size=100):
    ...

await cache.incr("key") # -> int
await cache.exists("key") # -> bool

await cache.delete("key")
await cache.delete_many("key1", "key2")
await cache.delete_match("pattern:*")

async for key in cache.scan("pattern:*"):
    ...

await cache.expire("key", timeout=10)
await cache.get_expire("key")  # -> int seconds to expire

await cache.ping(message=None)  # -> bytes
await cache.clear()

await cache.is_locked("key", wait=60)  # -> bool
async with cache.lock("key", expire=10):
    ...
await cache.set_lock("key", value="value", expire=60)  # -> bool
await cache.unlock("key", "value")  # -> bool

await cache.get_keys_count()  # -> int - total number of keys in cache
await cache.close()
```

### Disable Cache

Cache can be disabled not only at setup, but also in runtime. Cashews allow you to disable/enable any call of cache or specific commands:

```python
from cashews import cache, Command

cache.setup("mem://")  # configure as in-memory cache

cache.disable(Command.DELETE)
cache.disable()
cache.enable(Command.GET, Command.SET)
cache.enable()

with cache.disabling():
  ...
```

### Strategies

- [Simple cache](#simple-cache)
- [Fail cache (Failover cache)](#fail-cache-failover-cache)
- [Hit cache](#hit-cache)
- [Early](#early)
- [Soft](#soft)
- [Async Iterators](#iterators)
- [Locked](#locked)
- [Rate limit](#rate-limit)
- [Circuit breaker](#circuit-breaker)

#### Simple cache

This is a typical cache strategy: execute, store and return from cache until it expires.

```python
from datetime import timedelta
from cashews import cache

cache.setup("mem://")

@cache(ttl=timedelta(hours=3), key="user:{request.user.uid}")
async def long_running_function(request):
    ...
```

#### Fail cache (Failover cache)

Return cache result, if one of the given exceptions is raised (at least one function
call should succeed prior to that).

```python
from cashews import cache

cache.setup("mem://")

# note: the key will be "__module__.get_status:name:{name}"
@cache.failover(ttl="2h", exceptions=(ValueError, MyException))
async def get_status(name):
    value = await api_call()
    return {"status": value}
```

If exceptions didn't get will catch all exceptions or use default if it is set by:

```python
cache.set_default_fail_exceptions(ValueError, MyException)
```

#### Hit cache

Expire cache after given numbers of call `cache_hits`.

```python
from cashews import cache

cache.setup("mem://")

@cache.hit(ttl="2h", cache_hits=100, update_after=2)
async def get(name):
    value = await api_call()
    return {"status": value}
```

#### Early

Cache strategy that tries to solve [Cache stampede problem](https://en.wikipedia.org/wiki/Cache_stampede)
with a hot cache recalculating result in a background.

```python
from cashews import cache  # or: from cashews import early

# if you call this function after 7 min, cache will be updated in a background
@cache.early(ttl="10m", early_ttl="7m")
async def get(name):
    value = await api_call()
    return {"status": value}
```

#### Soft

Like a simple cache, but with a fail protection base on soft ttl.

```python
from cashews import cache

cache.setup("mem://")

# if you call this function after 7 min, cache will be updated and return a new result.
# If it fail on recalculation will return current cached value (if it is not more than 10 min old)
@cache.soft(ttl="10m", soft_ttl="7m")
async def get(name):
    value = await api_call()
    return {"status": value}
```

#### Iterators

All upper decorators can be used only with coroutines. Cashing async iterators works differently.
To cache async iterators use `iterator` decorator

```python
from cashews import cache

cache.setup("mem://")


@cache.iterator(ttl="10m", key="get:{name}")
async def get(name):
    async for item in get_pages(name):
        yield ...

```

#### Locked

Decorator that can help you to solve [Cache stampede problem](https://en.wikipedia.org/wiki/Cache_stampede).
Lock the following function calls until the first one is finished.
This guarantees exactly one function call for given ttl.

> :warning: \*\*Warning: this decorator will not cache the result
> To do it you can combine this decorator with any cache decorator or use parameter `lock=True` with `@cache()`

```python
from cashews import cache

cache.setup("mem://")

@cache.locked(ttl="10s")
async def get(name):
    value = await api_call()
    return {"status": value}
```

#### Rate limit

Rate limit for a function call: if rate limit is reached raise an `RateLimitError` exception.

> :warning: \*\*Warning: this decorator will not cache the result
> To do it you can combine this decorator with any cache failover decorator`

```python
from cashews import cache, RateLimitError

cache.setup("mem://")

# no more than 10 calls per minute or ban for 10 minutes - raise RateLimitError
@cache.rate_limit(limit=10, period="1m", ttl="10m")
async def get(name):
    value = await api_call()
    return {"status": value}



# no more than 100 calls in 10 minute window. if rate limit will rich -> return from cache
@cache.failover(ttl="10m", exceptions=(RateLimitError, ))
@cache.slice_rate_limit(limit=100, period="10m")
async def get_next(name):
    value = await api_call()
    return {"status": value}

```

#### Circuit breaker

Circuit breaker pattern. Count the number of failed calls and if the error rate reaches the specified value, it will raise `CircuitBreakerOpen` exception

> :warning: \*\*Warning: this decorator will not cache the result
> To do it you can combine this decorator with any cache failover decorator`

```python
from cashews import cache, CircuitBreakerOpen

cache.setup("mem://")

@cache.circuit_breaker(errors_rate=10, period="1m", ttl="5m")
async def get(name):
    ...


@cache.failover(ttl="10m", exceptions=(CircuitBreakerOpen, ))
@cache.circuit_breaker(errors_rate=10, period="10m", ttl="5m", half_open_ttl="1m")
async def get_next(name):
    ...

```

#### Bloom filter (experimental)

Simple Bloom filter:

```python
from cashews import cache

cache.setup("mem://")

@cache.bloom(capacity=10_000, false_positives=1)
async def email_exists(email: str) -> bool:
    ...

for email in all_users_emails:
    await email_exists.set(email)

await email_exists("example@example.com")
```

### Cache condition

By default, any successful result of the function call is stored, even if it is a `None`.
Caching decorators have the parameter - `condition`, which can be:

- a callable object that receives the result of a function call or an exception, args, kwargs and a cache key
- a string: "not_none" or "skip_none" to do not cache `None` values in

```python
from cashews import cache, NOT_NONE

cache.setup("mem://")

@cache(ttl="1h", condition=NOT_NONE)
async def get():
    ...


def skit_test_result(result, args, kwargs, key=None) -> bool:
    return result and result != "test"

@cache(ttl="1h", condition=skit_test_result)
async def get():
    ...

```

It is also possible to cache an exception that the function can raise, to do so use special conditions (only for simple, hit and early)

```python
from cashews import cache, with_exceptions, only_exceptions

cache.setup("mem://")

@cache(ttl="1h", condition=with_exceptions(MyException, TimeoutError))
async def get():
    ...


@cache(ttl="1h", condition=only_exceptions(MyException, TimeoutError))
async def get():
    ...

```

Also caching decorators have the parameter `time_condition` - min latency in seconds (can be set like `ttl`)
of getting the result of a function call to be cached.

```python
from cashews import cache

cache.setup("mem://")

@cache(ttl="1h", time_condition="3s")  # to cache for 1 hour if execution takes more than 3 seconds
async def get():
    ...
```

### Template Keys

Often, to compose a cache key, you need all the parameters of the function call.
By default, Cashews will generate a key using the function name, module names and parameters

```python
from cashews import cache

cache.setup("mem://")

@cache(ttl=timedelta(hours=3))
async def get_name(user, *args, version="v1", **kwargs):
    ...

# a key template will be "__module__.get_name:user:{user}:{__args__}:version:{version}:{__kwargs__}"

await get_name("me", version="v2")
# a key will be "__module__.get_name:user:me::version:v2"
await get_name("me", version="v1", foo="bar")
# a key will be "__module__.get_name:user:me::version:v1:foo:bar"
await get_name("me", "opt", "attr", opt="opt", attr="attr")
# a key will be "__module__.get_name:user:me:opt:attr:version:v1:attr:attr:opt:opt"
```

For more advanced usage it better to define a cache key manually:

```python
from cashews import cache

cache.setup("mem://")

@cache(ttl="2h", key="user_info:{user_id}")
async def get_info(user_id: str):
    ...

```

You may use objects in a key and access to an attribute through a template:

```python

@cache(ttl="2h", key="user_info:{user.uuid}")
async def get_info(user: User):
    ...

```

You may use built-in functions to format template values (`lower`, `upper`, `len`, `jwt`, `hash`)

```python

@cache(ttl="2h", key="user_info:{user.name:lower}:{password:hash(sha1)}")
async def get_info(user: User, password: str):
    ...


@cache(ttl="2h", key="user:{token:jwt(client_id)}")
async def get_user_by_token(token: str) -> User:
    ...

```

Or define your own transformation functions:

```python
from cashews import default_formatter, cache

cache.setup("mem://")

@default_formatter.register("prefix")
def _prefix(value, chars=3):
    return value[:chars].upper()


@cache(ttl="2h", key="servers-user:{user.index:prefix(4)}")  # a key will be "servers-user:DWQS"
async def get_user_servers(user):
    ...

```

or register type formatters:

```python
from decimal import Decimal
from cashews import default_formatter, cache

@default_formatter.type_format(Decimal)
def _decimal(value: Decimal) -> str:
    return str(value.quantize(Decimal("0.00")))


@cache(ttl="2h", key="price-{item.price}:{item.currency:upper}")  # a key will be "price-10.00:USD"
async def convert_price(item):
    ...

```

Not only function arguments can participate in a key formation. Cashews have a `template_context', use `@:get` in a template to paste variable from a context:

```python
from cashews import cache, key_context

cache.setup("mem://")


@cache(ttl="2h", key="user:{@:get(client_id)}")
async def get_current_user():
  pass

...
with key_context(client_id=135356):
    await get_current_user()

```

#### Template for a class method

```python
from cashews import cache

cache.setup("mem://")

class MyClass:

    @cache(ttl="2h")
    async def get_name(self, user, version="v1"):
         ...

# a key template will be "__module__:MyClass.get_name:self:{self}:user:{user}:version:{version}

await MyClass().get_name("me", version="v2")
# a key will be "__module__:MyClass.get_name:self:<__module__.MyClass object at 0x105edd6a0>:user:me:version:v1"
```

As you can see, there is an ugly reference to the instance in the key. That is not what we expect to see.
That cache will not work properly. There are 3 solutions to avoid it:

1. define `__str__` magic method in our class

```python

class MyClass:

    @cache(ttl="2h")
    async def get_name(self, user, version="v1"):
         ...

    def __str__(self) -> str:
        return self._host

await MyClass(host="http://example.com").get_name("me", version="v2")
# a key will be "__module__:MyClass.get_name:self:http://example.com:user:me:version:v1"
```

2. Set a key template

```python
class MyClass:

    @cache(ttl="2h", key="{self._host}:name:{user}:{version}")
    async def get_name(self, user, version="v1"):
         ...

await MyClass(host="http://example.com").get_name("me", version="v2")
# a key will be "http://example.com:name:me:v1"
```

3. Use `noself` or `noself_cache` if you want to exclude `self` from a key

```python
from cashews import cache, noself, noself_cache

cache.setup("mem://")

class MyClass:

    @noself(cache)(ttl="2h")
    async def get_name(self, user, version="v1"):
         ...

# a key template will be "__module__:MyClass.get_name:user:{user}:version:{version}

await MyClass().get_name("me", version="v2")
# a key will be "__module__:MyClass.get_name:user:me:version:v1"
```

### TTL

Cache time to live (`ttl`) is a required parameter for all cache decorators. TTL can be:

- an integer as the number of seconds
- a `timedelta`
- a string like in golang e.g `1d2h3m50s`
- a callable object like a function that receives `args` and `kwargs` of the decorated function and returns one of the previous format for TTL

Examples:

```python
from cashews import cache
from datetime import timedelta

cache.setup("mem://")

@cache(ttl=60 * 10)
async def get(item_id: int) -> Item:
    pass

@cache(ttl=timedelta(minutes=10))
async def get(item_id: int) -> Item:
    pass

@cache(ttl="10m")
async def get(item_id: int) -> Item:
    pass

def _ttl(item_id: int) -> str:
    return "2h" if item_id > 10 else "1h"

@cache(ttl=_ttl)
async def get(item_id: int) -> Item:
    pass
```

### What can be cached

Cashews mostly use built-in pickle to store data but also support other pickle-like serialization like dill.
Some types of objects are not picklable, in this case, cashews has API to define custom encoding/decoding:

```python
from cashews.serialize import register_type


async def my_encoder(value: CustomType, *args, **kwargs) -> bytes:
    ...


async def my_decoder(value: bytes, *args, **kwargs) -> CustomType:
    ...


register_type(CustomType, my_encoder, my_decoder)
```

### Cache invalidation

Cache invalidation - one of the main Computer Science well-known problems.

Sometimes, you want to invalidate the cache after some action is triggered.
Consider this example:

```python
from cashews import cache

cache.setup("mem://")

@cache(ttl="1h", key="items:page:{page}")
async def items(page=1):
    ...

@cache.invalidate("items:page:*")
async def create_item(item):
   ...
```

Here, the cache for `items` will be invalidated every time `create_item` is called
There are two problems:

1. with redis backend you cashews will scan a full database to get a key that match a pattern (`items:page:*`) - not good for performance reasons
2. what if we do not specify a key for cache:

```python
@cache(ttl="1h")
async def items(page=1):
    ...
```

Cashews provide the tag system: you can tag cache keys, so they will be stored in a separate [SET](https://redis.io/docs/data-types/sets/)
to avoid high load on redis storage. To use the tags in a more efficient way please use it with the client side feature.

> :warning: \*\*Warning: Tags require setting up default cache or cache for tags prefix
> ```python
> from cashews import cache
> cache.setup(...)
> # or
> cache.setup_tags_backend(...)
> ```


```python
from cashews import cache

cache.setup("redis://", client_side=True)

@cache(ttl="1h", tags=["items", "page:{page}"])
async def items(page=1):
    ...


await cache.delete_tags("page:1")
await cache.delete_tags("items")

# low level api
cache.register_tag("my_tag", key_template="key{i}")

await cache.set("key1", "value", expire="1d", tags=["my_tag"])
```

You can invalidate future call of cache request by context manager:

```python
from cashews import cache, invalidate_further

@cache(ttl="3h")
async def items():
    ...

async def add_item(item: Item) -> List[Item]:
    ...
    with invalidate_further():
        await items
```

#### Cache invalidation on code change

Often, you may face a problem with an invalid cache after the code is changed. For example:

```python
@cache(ttl=timedelta(days=1), key="user:{user_id}")
async def get_user(user_id):
    return {"name": "Dmitry", "surname": "Krykov"}
```

Then, the returned value was changed to:

```bash
-    return {"name": "Dmitry", "surname": "Krykov"}
+    return {"full_name": "Dmitry Krykov"}
```

Since the function returns a dict, there is no simple way to automatically detect
that kind of cache invalidity

One way to solve the problem is to add a prefix for this cache:

```python
@cache(ttl=timedelta(days=1), prefix="v2")
async def get_user(user_id):
    return {"full_name": "Dmitry Krykov"}
```

but it is so easy to forget to do it...

The best defense against this problem is to use your own datacontainers, like
[dataclasses](https://docs.python.org/3/library/dataclasses.html),
with defined `__repr__` method.
This will add distinctness and `cashews` can detect changes in such structures automatically
by checking [object representation](https://docs.python.org/3/reference/datamodel.html#object.__repr__).

```python
from dataclasses import dataclass

from cashews import cache

cache.setup("mem://")

@dataclass
class User:
    name: str
    surname: str

# or define your own class with __repr__ method

class User:

    def __init__(self, name, surname):
        self.name, self.surname = name, surname

    def __repr__(self):
        return f"{self.name} {self.surname}"

# Will detect changes of a structure
@cache(ttl="1d", prefix="v2")
async def get_user(user_id):
    return User("Dima", "Krykov")
```

### Detect the source of a result

Decorators give us a very simple API but also make it difficult to understand where
the result is coming from - cache or direct call.

To solve this problem `cashews` has `detect` context manager:

```python
from cashews import cache

with cache.detect as detector:
    response = await something_that_use_cache()
    calls = detector.calls

print(calls)
# >>> {"my:key": [{"ttl": 10, "name": "simple", "backend": "redis"}, ], "fail:key": [{"ttl": 10, "exc": RateLimit}, "name": "fail", "backend": "mem"],}
```

E.g. A simple middleware to use it in a web app:

```python
@app.middleware("http")
async def add_from_cache_headers(request: Request, call_next):
    with cache.detect as detector:
        response = await call_next(request)
        if detector.calls:
            key = list(detector.calls.keys())[0]
            response.headers["X-From-Cache"] = key
            expire = await cache.get_expire(key)
            response.headers["X-From-Cache-Expire-In-Seconds"] = str(expire)
    return response
```

### Middleware

Cashews provide the interface for a "middleware" pattern:

```python
import logging
from cashews import cache

logger = logging.getLogger(__name__)


async def logging_middleware(call, cmd: Command, backend: Backend, *args, **kwargs):
    key = args[0] if args else kwargs.get("key", kwargs.get("pattern", ""))
    logger.info("=> Cache request: %s ", cmd.value, extra={"args": args, "cache_key": key})
    return await call(*args, **kwargs)


cache.setup("mem://", middlewares=(logging_middleware, ))
```

#### Callbacks

One of the middleware that is preinstalled in cache instance is `CallbackMiddleware`.
This middleware also add to a cache a new interface that allow to add a function that will be called before given command will be triggered

```python
from cashews import cache, Command


def callback(key, result):
  print(f"GET key={key}")

with cache.callback(callback, cmd=Command.GET):
    await cache.get("test")  # also will print "GET key=test"

```

### Transactional

Applications are more often based on a database with transaction (OLTP) usage. Usually cache supports transactions poorly.
Here is just a simple example of how we can make our cache inconsistent:

```python
async def my_handler():
    async with db.transaction():
        await db.insert(user)
        await cache.set(f"key:{user.id}", user)
        await api.service.register(user)
```

Here the API call may fail, the database transaction will rollback, but the cache will not.
Of course, in this code, we can solve it by moving the cache call outside transaction, but in real code it may not so easy.
Another case: we want to make bulk operations with a group of keys to keep it consistent:

```python
async def login(user, token, session):
    ...
    old_session = await cache.get(f"current_session:{user.id}")
    await cache.incr(f"sessions_count:{user.id}")
    await cache.set(f"current_session:{user.id}", session)
    await cache.set(f"token:{token.id}", user)
    return old_session
```

Here we want to have some way to protect our code from race conditions and do operations with cache simultaneously.

Cashews support transaction operations:

  > :warning: \*\*Warning: transaction operations are `set`, `set_many`, `delete`, `delete_many`, `delete_match` and `incr`

```python
from cashews import cache
...

@cache.transaction()
async def my_handler():
    async with db.transaction():
        await db.insert(user)
        await cache.set(f"key:{user.id}", user)
        await api.service.register(user)

# or
async def login(user, token, session):
    async with cache.transaction() as tx:
        old_session = await cache.get(f"current_session:{user.id}")
        await cache.incr(f"sessions_count:{user.id}")
        await cache.set(f"current_session:{user.id}", session)
        await cache.set(f"token:{token.id}", user)
        if ...:
            tx.rollback()
    return old_session

```

Transactions in cashews support different modes of "isolation"

- fast (0-7% overhead) - memory based, can't protect of race conditions, but may use for atomicity
- locked (default - 4-9% overhead) - use kind of shared lock per cache key (in case of redis or disk backend), protect of race conditions
- serializable (7-50% overhead) - use global shared lock - one transaction per time (almost useless)

```python
from cashews import cache, TransactionMode
...

@cache.transaction(TransactionMode.SERIALIZABLE, timeout=1)
async def my_handler():
   ...
```

### Contrib

This library is framework agnostic, but includes several "batteries" for most popular tools.

#### Fastapi

You may find a few middlewares useful that can help you to control a cache in you web application based on fastapi.

1. `CacheEtagMiddleware` - middleware add Etag and check 'If-None-Match' header based on Etag
2. `CacheRequestControlMiddleware` - middleware check and add `Cache-Control` header
3. `CacheDeleteMiddleware` - clear cache for an endpoint based on `Clear-Site-Data` header

> :warning: \*\*Warning: CacheEtagMiddleware requires setting up default cache or cache with prefix "fastapi:"
> ```python
> from cashews import cache
> cache.setup(...)
> # or
> cache.setup(..., prefix="fastapi:")
> ```

Example:

```python
from fastapi import FastAPI, Header, Query
from fastapi.responses import StreamingResponse

from cashews import cache
from cashews.contrib.fastapi import (
    CacheDeleteMiddleware,
    CacheEtagMiddleware,
    CacheRequestControlMiddleware,
    cache_control_ttl,
)

app = FastAPI()
app.add_middleware(CacheDeleteMiddleware)
app.add_middleware(CacheEtagMiddleware)
app.add_middleware(CacheRequestControlMiddleware)
metrics_middleware = create_metrics_middleware()
cache.setup(os.environ.get("CACHE_URI", "redis://"))



@app.get("/")
@cache.failover(ttl="1h")
@cache(ttl=cache_control_ttl(default="4m"), key="simple:{user_agent:hash}", time_condition="1s")
async def simple(user_agent: str = Header("No")):
    ...


@app.get("/stream")
@cache(ttl="1m", key="stream:{file_path}")
async def stream(file_path: str = Query(__file__)):
    return StreamingResponse(_read_file(file_path=file_path))


async def _read_file(_read_file):
    ...

```

Also cashews can cache stream responses

#### Prometheus

You can easily provide metrics using the Prometheus middleware.

```python
from cashews import cache
from cashews.contrib.prometheus import create_metrics_middleware

metrics_middleware = create_metrics_middleware(with_tag=False)
cache.setup("redis://", middlewares=(metrics_middleware,))

```

## Development

### Setup

- Clone the project.
- After creating a virtual environment, install [pre-commit](https://pre-commit.com/):
  ```shell
  pip install pre-commit && pre-commit install --install-hooks
  ```

### Tests

To run tests you can use `tox`:

```shell
pip install tox
tox -e py  // tests for inmemory backend
tox -e py-diskcache  // tests for diskcache backend
tox -e py-redis  // tests for redis backend  - you need to run redis
tox -e py-integration  // tests for integrations with aiohttp and fastapi

tox // to run all tests for all python that is installed on your machine
```

Or use `pytest`, but 2 tests always fail, it is OK:

```shell
pip install .[tests,redis,diskcache,speedup] fastapi aiohttp requests httpx SQLAlchemy prometheus-client

pytest // run all tests with all backends
pytest -m "not redis" // all tests without tests for redis backend
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/Krukov/cashews/",
    "name": "cashews",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "cache aio async multicache aiocache",
    "author": "Dmitry Kryukov",
    "author_email": "glebov.ru@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/e3/c4/53602c566d0c341b4b80f5a4257e303e4e78ffb95452bded2071e88c758a/cashews-7.4.0.tar.gz",
    "platform": null,
    "description": "<h1 align=\"center\">\ud83e\udd54 CASHEWS \ud83e\udd54</h1>\n\n<p align=\"center\">\n    <em>Async cache framework with simple API to build fast and reliable applications</em>\n</p>\n\n```bash\npip install cashews\npip install cashews[redis]\npip install cashews[diskcache]\npip install cashews[dill] # can cache in redis more types of objects\npip install cashews[speedup] # for bloom filters\n```\n\n---\n\n## Why\n\nCache plays a significant role in modern applications and everybody wants to use all the power of async programming and cache.\nThere are a few advanced techniques with cache and async programming that can help you build simple, fast,\nscalable and reliable applications. This library intends to make it easy to implement such techniques.\n\n## Features\n\n- Easy to configure and use\n- Decorator-based API, decorate and play\n- Different cache strategies out-of-the-box\n- Support for multiple storage backends ([In-memory](#in-memory), [Redis](#redis), [DiskCache](diskcache))\n- Set TTL as a string (\"2h5m\"), as `timedelta` or use a function in case TTL depends on key parameters\n- Transactionality\n- Middlewares\n- Client-side cache (10x faster than simple cache with redis)\n- Bloom filters\n- Different cache invalidation techniques (time-based or tags)\n- Cache any objects securely with pickle (use [secret](#redis))\n- 2x faster than `aiocache` (with client side caching)\n\n## Usage Example\n\n```python\nfrom cashews import cache\n\ncache.setup(\"mem://\")  # configure as in-memory cache, but redis/diskcache is also supported\n\n# use a decorator-based API\n@cache(ttl=\"3h\", key=\"user:{request.user.uid}\")\nasync def long_running_function(request):\n    ...\n\n# or for fine-grained control, use it directly in a function\nasync def cache_using_function(request):\n    await cache.set(key=request.user.uid, value=request.user, expire=\"20h\")\n    ...\n```\n\nMore examples [here](https://github.com/Krukov/cashews/tree/master/examples)\n\n## Table of Contents\n\n- [Configuration](#configuration)\n- [Available Backends](#available-backends)\n- [Basic API](#basic-api)\n- [Disable Cache](#disable-cache)\n- [Strategies](#strategies)\n  - [Cache condition](#cache-condition)\n  - [Keys templating](#template-keys)\n  - [TTL](#ttl)\n  - [What can be cached](#what-can-be-cached)\n- [Cache Invalidation](#cache-invalidation)\n  - [Cache invalidation on code change](#cache-invalidation-on-code-change)\n- [Detect the source of a result](#detect-the-source-of-a-result)\n- [Middleware](#middleware)\n- [Callbacks](#callbacks)\n- [Transactional mode](#transactional)\n- [Contrib](#contrib)\n  - [Fastapi](#fastapi)\n  - [Prometheus](#prometheus)\n\n### Configuration\n\n`cashews` provides a default cache, that you can setup in two different ways:\n\n```python\nfrom cashews import cache\n\n# via url\ncache.setup(\"redis://0.0.0.0/?db=1&socket_connect_timeout=0.5&suppress=0&secret=my_secret&enable=1\")\n# or via kwargs\ncache.setup(\"redis://0.0.0.0/\", db=1, wait_for_connection_timeout=0.5, suppress=False, secret=b\"my_key\", enable=True)\n```\n\nAlternatively, you can create a cache instance yourself:\n\n```python\nfrom cashews import Cache\n\ncache = Cache()\ncache.setup(...)\n```\n\nOptionally, you can disable cache with `disable`/`enable` parameter (see [Disable Cache](#disable-cache)):\n\n```python\ncache.setup(\"redis://redis/0?enable=1\")\ncache.setup(\"mem://?size=500\", disable=True)\ncache.setup(\"mem://?size=500\", enable=False)\n```\n\nYou can setup different Backends based on a prefix:\n\n```python\ncache.setup(\"redis://redis/0\")\ncache.setup(\"mem://?size=500\", prefix=\"user\")\n\nawait cache.get(\"accounts\")  # will use the redis backend\nawait cache.get(\"user:1\")  # will use the memory backend\n```\n\n### Available Backends\n\n#### In-memory\n\nThe in-memory cache uses fixed-sized LRU dict to store values. It checks expiration on `get`\nand periodically purge expired keys.\n\n```python\ncache.setup(\"mem://\")\ncache.setup(\"mem://?check_interval=10&size=10000\")\n```\n\n#### Redis\n\n_Requires [redis](https://github.com/redis/redis-py) package._\\\n\nThis will use Redis as a storage.\n\nThis backend uses [pickle](https://docs.python.org/3/library/pickle.html) module to serialize\nvalues, but the cashes can store values with sha1-keyed hash.\n\nUse `secret` and `digestmod` parameters to protect your application from security vulnerabilities.\n\nThe `digestmod` is a hashing algorithm that can be used: `sum`, `md5` (default), `sha1` and `sha256`\n\nThe `secret` is a salt for a hash.\n\nPickle can't serialize any type of object. In case you need to store more complex types\n\nyou can use [dill](https://github.com/uqfoundation/dill) - set `pickle_type=\"dill\"`.\nDill is great, but less performance.\nIf you need complex serializer for [sqlalchemy](https://docs.sqlalchemy.org/en/14/core/serializer.html) objects you can set `pickle_type=\"sqlalchemy\"`\nUse `json` also an option to serialize/deserialize an object, but it very limited (`pickle_type=\"json\"`)\n\nAny connection errors are suppressed, to disable it use `suppress=False` - a `CacheBackendInteractionError` will be raised\n\nIf you would like to use [client-side cache](https://redis.io/topics/client-side-caching) set `client_side=True`\n\nClient side cache will add `cashews:` prefix for each key, to customize it use `client_side_prefix` option.\n\n```python\ncache.setup(\"redis://0.0.0.0/?db=1&minsize=10&suppress=false&secret=my_secret\", prefix=\"func\")\ncache.setup(\"redis://0.0.0.0/2\", password=\"my_pass\", socket_connect_timeout=0.1, retry_on_timeout=True, secret=\"my_secret\")\ncache.setup(\"redis://0.0.0.0\", client_side=True, client_side_prefix=\"my_prefix:\", pickle_type=\"dill\")\n```\n\nFor using secure connections to redis (over ssl) uri should have `rediss` as schema\n\n```python\ncache.setup(\"rediss://0.0.0.0/\", ssl_ca_certs=\"path/to/ca.crt\", ssl_keyfile=\"path/to/client.key\",ssl_certfile=\"path/to/client.crt\",)\n```\n\n#### DiskCache\n\n_Requires [diskcache](https://github.com/grantjenks/python-diskcache) package._\n\nThis will use local sqlite databases (with shards) as storage.\n\nIt is a good choice if you don't want to use redis, but you need a shared storage, or your cache takes a lot of local memory.\nAlso, it is a good choice for client side local storage.\n\nYou can setup disk cache with [FanoutCache parameters](http://www.grantjenks.com/docs/diskcache/api.html#fanoutcache)\n\n** Warning ** `cache.scan` and `cache.get_match` does not work with this storage (works only if shards are disabled)\n\n```python\ncache.setup(\"disk://\")\ncache.setup(\"disk://?directory=/tmp/cache&timeout=1&shards=0\")  # disable shards\nGb = 1073741824\ncache.setup(\"disk://\", size_limit=3 * Gb, shards=12)\n```\n\n### Basic API\n\nThere are a few basic methods to work with cache:\n\n```python\nfrom cashews import cache\n\ncache.setup(\"mem://\")  # configure as in-memory cache\n\nawait cache.set(key=\"key\", value=90, expire=\"2h\", exist=None)  # -> bool\nawait cache.set_raw(key=\"key\", value=\"str\")  # -> bool\nawait cache.set_many({\"key1\": value, \"key2\": value})  # -> None\n\nawait cache.get(\"key\", default=None)  # -> Any\nawait cache.get_or_set(\"key\", default=awaitable_or_callable, expire=\"1h\")  # -> Any\nawait cache.get_raw(\"key\") # -> Any\nawait cache.get_many(\"key1\", \"key2\", default=None)  # -> tuple[Any]\nasync for key, value in cache.get_match(\"pattern:*\", batch_size=100):\n    ...\n\nawait cache.incr(\"key\") # -> int\nawait cache.exists(\"key\") # -> bool\n\nawait cache.delete(\"key\")\nawait cache.delete_many(\"key1\", \"key2\")\nawait cache.delete_match(\"pattern:*\")\n\nasync for key in cache.scan(\"pattern:*\"):\n    ...\n\nawait cache.expire(\"key\", timeout=10)\nawait cache.get_expire(\"key\")  # -> int seconds to expire\n\nawait cache.ping(message=None)  # -> bytes\nawait cache.clear()\n\nawait cache.is_locked(\"key\", wait=60)  # -> bool\nasync with cache.lock(\"key\", expire=10):\n    ...\nawait cache.set_lock(\"key\", value=\"value\", expire=60)  # -> bool\nawait cache.unlock(\"key\", \"value\")  # -> bool\n\nawait cache.get_keys_count()  # -> int - total number of keys in cache\nawait cache.close()\n```\n\n### Disable Cache\n\nCache can be disabled not only at setup, but also in runtime. Cashews allow you to disable/enable any call of cache or specific commands:\n\n```python\nfrom cashews import cache, Command\n\ncache.setup(\"mem://\")  # configure as in-memory cache\n\ncache.disable(Command.DELETE)\ncache.disable()\ncache.enable(Command.GET, Command.SET)\ncache.enable()\n\nwith cache.disabling():\n  ...\n```\n\n### Strategies\n\n- [Simple cache](#simple-cache)\n- [Fail cache (Failover cache)](#fail-cache-failover-cache)\n- [Hit cache](#hit-cache)\n- [Early](#early)\n- [Soft](#soft)\n- [Async Iterators](#iterators)\n- [Locked](#locked)\n- [Rate limit](#rate-limit)\n- [Circuit breaker](#circuit-breaker)\n\n#### Simple cache\n\nThis is a typical cache strategy: execute, store and return from cache until it expires.\n\n```python\nfrom datetime import timedelta\nfrom cashews import cache\n\ncache.setup(\"mem://\")\n\n@cache(ttl=timedelta(hours=3), key=\"user:{request.user.uid}\")\nasync def long_running_function(request):\n    ...\n```\n\n#### Fail cache (Failover cache)\n\nReturn cache result, if one of the given exceptions is raised (at least one function\ncall should succeed prior to that).\n\n```python\nfrom cashews import cache\n\ncache.setup(\"mem://\")\n\n# note: the key will be \"__module__.get_status:name:{name}\"\n@cache.failover(ttl=\"2h\", exceptions=(ValueError, MyException))\nasync def get_status(name):\n    value = await api_call()\n    return {\"status\": value}\n```\n\nIf exceptions didn't get will catch all exceptions or use default if it is set by:\n\n```python\ncache.set_default_fail_exceptions(ValueError, MyException)\n```\n\n#### Hit cache\n\nExpire cache after given numbers of call `cache_hits`.\n\n```python\nfrom cashews import cache\n\ncache.setup(\"mem://\")\n\n@cache.hit(ttl=\"2h\", cache_hits=100, update_after=2)\nasync def get(name):\n    value = await api_call()\n    return {\"status\": value}\n```\n\n#### Early\n\nCache strategy that tries to solve [Cache stampede problem](https://en.wikipedia.org/wiki/Cache_stampede)\nwith a hot cache recalculating result in a background.\n\n```python\nfrom cashews import cache  # or: from cashews import early\n\n# if you call this function after 7 min, cache will be updated in a background\n@cache.early(ttl=\"10m\", early_ttl=\"7m\")\nasync def get(name):\n    value = await api_call()\n    return {\"status\": value}\n```\n\n#### Soft\n\nLike a simple cache, but with a fail protection base on soft ttl.\n\n```python\nfrom cashews import cache\n\ncache.setup(\"mem://\")\n\n# if you call this function after 7 min, cache will be updated and return a new result.\n# If it fail on recalculation will return current cached value (if it is not more than 10 min old)\n@cache.soft(ttl=\"10m\", soft_ttl=\"7m\")\nasync def get(name):\n    value = await api_call()\n    return {\"status\": value}\n```\n\n#### Iterators\n\nAll upper decorators can be used only with coroutines. Cashing async iterators works differently.\nTo cache async iterators use `iterator` decorator\n\n```python\nfrom cashews import cache\n\ncache.setup(\"mem://\")\n\n\n@cache.iterator(ttl=\"10m\", key=\"get:{name}\")\nasync def get(name):\n    async for item in get_pages(name):\n        yield ...\n\n```\n\n#### Locked\n\nDecorator that can help you to solve [Cache stampede problem](https://en.wikipedia.org/wiki/Cache_stampede).\nLock the following function calls until the first one is finished.\nThis guarantees exactly one function call for given ttl.\n\n> :warning: \\*\\*Warning: this decorator will not cache the result\n> To do it you can combine this decorator with any cache decorator or use parameter `lock=True` with `@cache()`\n\n```python\nfrom cashews import cache\n\ncache.setup(\"mem://\")\n\n@cache.locked(ttl=\"10s\")\nasync def get(name):\n    value = await api_call()\n    return {\"status\": value}\n```\n\n#### Rate limit\n\nRate limit for a function call: if rate limit is reached raise an `RateLimitError` exception.\n\n> :warning: \\*\\*Warning: this decorator will not cache the result\n> To do it you can combine this decorator with any cache failover decorator`\n\n```python\nfrom cashews import cache, RateLimitError\n\ncache.setup(\"mem://\")\n\n# no more than 10 calls per minute or ban for 10 minutes - raise RateLimitError\n@cache.rate_limit(limit=10, period=\"1m\", ttl=\"10m\")\nasync def get(name):\n    value = await api_call()\n    return {\"status\": value}\n\n\n\n# no more than 100 calls in 10 minute window. if rate limit will rich -> return from cache\n@cache.failover(ttl=\"10m\", exceptions=(RateLimitError, ))\n@cache.slice_rate_limit(limit=100, period=\"10m\")\nasync def get_next(name):\n    value = await api_call()\n    return {\"status\": value}\n\n```\n\n#### Circuit breaker\n\nCircuit breaker pattern. Count the number of failed calls and if the error rate reaches the specified value, it will raise `CircuitBreakerOpen` exception\n\n> :warning: \\*\\*Warning: this decorator will not cache the result\n> To do it you can combine this decorator with any cache failover decorator`\n\n```python\nfrom cashews import cache, CircuitBreakerOpen\n\ncache.setup(\"mem://\")\n\n@cache.circuit_breaker(errors_rate=10, period=\"1m\", ttl=\"5m\")\nasync def get(name):\n    ...\n\n\n@cache.failover(ttl=\"10m\", exceptions=(CircuitBreakerOpen, ))\n@cache.circuit_breaker(errors_rate=10, period=\"10m\", ttl=\"5m\", half_open_ttl=\"1m\")\nasync def get_next(name):\n    ...\n\n```\n\n#### Bloom filter (experimental)\n\nSimple Bloom filter:\n\n```python\nfrom cashews import cache\n\ncache.setup(\"mem://\")\n\n@cache.bloom(capacity=10_000, false_positives=1)\nasync def email_exists(email: str) -> bool:\n    ...\n\nfor email in all_users_emails:\n    await email_exists.set(email)\n\nawait email_exists(\"example@example.com\")\n```\n\n### Cache condition\n\nBy default, any successful result of the function call is stored, even if it is a `None`.\nCaching decorators have the parameter - `condition`, which can be:\n\n- a callable object that receives the result of a function call or an exception, args, kwargs and a cache key\n- a string: \"not_none\" or \"skip_none\" to do not cache `None` values in\n\n```python\nfrom cashews import cache, NOT_NONE\n\ncache.setup(\"mem://\")\n\n@cache(ttl=\"1h\", condition=NOT_NONE)\nasync def get():\n    ...\n\n\ndef skit_test_result(result, args, kwargs, key=None) -> bool:\n    return result and result != \"test\"\n\n@cache(ttl=\"1h\", condition=skit_test_result)\nasync def get():\n    ...\n\n```\n\nIt is also possible to cache an exception that the function can raise, to do so use special conditions (only for simple, hit and early)\n\n```python\nfrom cashews import cache, with_exceptions, only_exceptions\n\ncache.setup(\"mem://\")\n\n@cache(ttl=\"1h\", condition=with_exceptions(MyException, TimeoutError))\nasync def get():\n    ...\n\n\n@cache(ttl=\"1h\", condition=only_exceptions(MyException, TimeoutError))\nasync def get():\n    ...\n\n```\n\nAlso caching decorators have the parameter `time_condition` - min latency in seconds (can be set like `ttl`)\nof getting the result of a function call to be cached.\n\n```python\nfrom cashews import cache\n\ncache.setup(\"mem://\")\n\n@cache(ttl=\"1h\", time_condition=\"3s\")  # to cache for 1 hour if execution takes more than 3 seconds\nasync def get():\n    ...\n```\n\n### Template Keys\n\nOften, to compose a cache key, you need all the parameters of the function call.\nBy default, Cashews will generate a key using the function name, module names and parameters\n\n```python\nfrom cashews import cache\n\ncache.setup(\"mem://\")\n\n@cache(ttl=timedelta(hours=3))\nasync def get_name(user, *args, version=\"v1\", **kwargs):\n    ...\n\n# a key template will be \"__module__.get_name:user:{user}:{__args__}:version:{version}:{__kwargs__}\"\n\nawait get_name(\"me\", version=\"v2\")\n# a key will be \"__module__.get_name:user:me::version:v2\"\nawait get_name(\"me\", version=\"v1\", foo=\"bar\")\n# a key will be \"__module__.get_name:user:me::version:v1:foo:bar\"\nawait get_name(\"me\", \"opt\", \"attr\", opt=\"opt\", attr=\"attr\")\n# a key will be \"__module__.get_name:user:me:opt:attr:version:v1:attr:attr:opt:opt\"\n```\n\nFor more advanced usage it better to define a cache key manually:\n\n```python\nfrom cashews import cache\n\ncache.setup(\"mem://\")\n\n@cache(ttl=\"2h\", key=\"user_info:{user_id}\")\nasync def get_info(user_id: str):\n    ...\n\n```\n\nYou may use objects in a key and access to an attribute through a template:\n\n```python\n\n@cache(ttl=\"2h\", key=\"user_info:{user.uuid}\")\nasync def get_info(user: User):\n    ...\n\n```\n\nYou may use built-in functions to format template values (`lower`, `upper`, `len`, `jwt`, `hash`)\n\n```python\n\n@cache(ttl=\"2h\", key=\"user_info:{user.name:lower}:{password:hash(sha1)}\")\nasync def get_info(user: User, password: str):\n    ...\n\n\n@cache(ttl=\"2h\", key=\"user:{token:jwt(client_id)}\")\nasync def get_user_by_token(token: str) -> User:\n    ...\n\n```\n\nOr define your own transformation functions:\n\n```python\nfrom cashews import default_formatter, cache\n\ncache.setup(\"mem://\")\n\n@default_formatter.register(\"prefix\")\ndef _prefix(value, chars=3):\n    return value[:chars].upper()\n\n\n@cache(ttl=\"2h\", key=\"servers-user:{user.index:prefix(4)}\")  # a key will be \"servers-user:DWQS\"\nasync def get_user_servers(user):\n    ...\n\n```\n\nor register type formatters:\n\n```python\nfrom decimal import Decimal\nfrom cashews import default_formatter, cache\n\n@default_formatter.type_format(Decimal)\ndef _decimal(value: Decimal) -> str:\n    return str(value.quantize(Decimal(\"0.00\")))\n\n\n@cache(ttl=\"2h\", key=\"price-{item.price}:{item.currency:upper}\")  # a key will be \"price-10.00:USD\"\nasync def convert_price(item):\n    ...\n\n```\n\nNot only function arguments can participate in a key formation. Cashews have a `template_context', use `@:get` in a template to paste variable from a context:\n\n```python\nfrom cashews import cache, key_context\n\ncache.setup(\"mem://\")\n\n\n@cache(ttl=\"2h\", key=\"user:{@:get(client_id)}\")\nasync def get_current_user():\n  pass\n\n...\nwith key_context(client_id=135356):\n    await get_current_user()\n\n```\n\n#### Template for a class method\n\n```python\nfrom cashews import cache\n\ncache.setup(\"mem://\")\n\nclass MyClass:\n\n    @cache(ttl=\"2h\")\n    async def get_name(self, user, version=\"v1\"):\n         ...\n\n# a key template will be \"__module__:MyClass.get_name:self:{self}:user:{user}:version:{version}\n\nawait MyClass().get_name(\"me\", version=\"v2\")\n# a key will be \"__module__:MyClass.get_name:self:<__module__.MyClass object at 0x105edd6a0>:user:me:version:v1\"\n```\n\nAs you can see, there is an ugly reference to the instance in the key. That is not what we expect to see.\nThat cache will not work properly. There are 3 solutions to avoid it:\n\n1. define `__str__` magic method in our class\n\n```python\n\nclass MyClass:\n\n    @cache(ttl=\"2h\")\n    async def get_name(self, user, version=\"v1\"):\n         ...\n\n    def __str__(self) -> str:\n        return self._host\n\nawait MyClass(host=\"http://example.com\").get_name(\"me\", version=\"v2\")\n# a key will be \"__module__:MyClass.get_name:self:http://example.com:user:me:version:v1\"\n```\n\n2. Set a key template\n\n```python\nclass MyClass:\n\n    @cache(ttl=\"2h\", key=\"{self._host}:name:{user}:{version}\")\n    async def get_name(self, user, version=\"v1\"):\n         ...\n\nawait MyClass(host=\"http://example.com\").get_name(\"me\", version=\"v2\")\n# a key will be \"http://example.com:name:me:v1\"\n```\n\n3. Use `noself` or `noself_cache` if you want to exclude `self` from a key\n\n```python\nfrom cashews import cache, noself, noself_cache\n\ncache.setup(\"mem://\")\n\nclass MyClass:\n\n    @noself(cache)(ttl=\"2h\")\n    async def get_name(self, user, version=\"v1\"):\n         ...\n\n# a key template will be \"__module__:MyClass.get_name:user:{user}:version:{version}\n\nawait MyClass().get_name(\"me\", version=\"v2\")\n# a key will be \"__module__:MyClass.get_name:user:me:version:v1\"\n```\n\n### TTL\n\nCache time to live (`ttl`) is a required parameter for all cache decorators. TTL can be:\n\n- an integer as the number of seconds\n- a `timedelta`\n- a string like in golang e.g `1d2h3m50s`\n- a callable object like a function that receives `args` and `kwargs` of the decorated function and returns one of the previous format for TTL\n\nExamples:\n\n```python\nfrom cashews import cache\nfrom datetime import timedelta\n\ncache.setup(\"mem://\")\n\n@cache(ttl=60 * 10)\nasync def get(item_id: int) -> Item:\n    pass\n\n@cache(ttl=timedelta(minutes=10))\nasync def get(item_id: int) -> Item:\n    pass\n\n@cache(ttl=\"10m\")\nasync def get(item_id: int) -> Item:\n    pass\n\ndef _ttl(item_id: int) -> str:\n    return \"2h\" if item_id > 10 else \"1h\"\n\n@cache(ttl=_ttl)\nasync def get(item_id: int) -> Item:\n    pass\n```\n\n### What can be cached\n\nCashews mostly use built-in pickle to store data but also support other pickle-like serialization like dill.\nSome types of objects are not picklable, in this case, cashews has API to define custom encoding/decoding:\n\n```python\nfrom cashews.serialize import register_type\n\n\nasync def my_encoder(value: CustomType, *args, **kwargs) -> bytes:\n    ...\n\n\nasync def my_decoder(value: bytes, *args, **kwargs) -> CustomType:\n    ...\n\n\nregister_type(CustomType, my_encoder, my_decoder)\n```\n\n### Cache invalidation\n\nCache invalidation - one of the main Computer Science well-known problems.\n\nSometimes, you want to invalidate the cache after some action is triggered.\nConsider this example:\n\n```python\nfrom cashews import cache\n\ncache.setup(\"mem://\")\n\n@cache(ttl=\"1h\", key=\"items:page:{page}\")\nasync def items(page=1):\n    ...\n\n@cache.invalidate(\"items:page:*\")\nasync def create_item(item):\n   ...\n```\n\nHere, the cache for `items` will be invalidated every time `create_item` is called\nThere are two problems:\n\n1. with redis backend you cashews will scan a full database to get a key that match a pattern (`items:page:*`) - not good for performance reasons\n2. what if we do not specify a key for cache:\n\n```python\n@cache(ttl=\"1h\")\nasync def items(page=1):\n    ...\n```\n\nCashews provide the tag system: you can tag cache keys, so they will be stored in a separate [SET](https://redis.io/docs/data-types/sets/)\nto avoid high load on redis storage. To use the tags in a more efficient way please use it with the client side feature.\n\n> :warning: \\*\\*Warning: Tags require setting up default cache or cache for tags prefix\n> ```python\n> from cashews import cache\n> cache.setup(...)\n> # or\n> cache.setup_tags_backend(...)\n> ```\n\n\n```python\nfrom cashews import cache\n\ncache.setup(\"redis://\", client_side=True)\n\n@cache(ttl=\"1h\", tags=[\"items\", \"page:{page}\"])\nasync def items(page=1):\n    ...\n\n\nawait cache.delete_tags(\"page:1\")\nawait cache.delete_tags(\"items\")\n\n# low level api\ncache.register_tag(\"my_tag\", key_template=\"key{i}\")\n\nawait cache.set(\"key1\", \"value\", expire=\"1d\", tags=[\"my_tag\"])\n```\n\nYou can invalidate future call of cache request by context manager:\n\n```python\nfrom cashews import cache, invalidate_further\n\n@cache(ttl=\"3h\")\nasync def items():\n    ...\n\nasync def add_item(item: Item) -> List[Item]:\n    ...\n    with invalidate_further():\n        await items\n```\n\n#### Cache invalidation on code change\n\nOften, you may face a problem with an invalid cache after the code is changed. For example:\n\n```python\n@cache(ttl=timedelta(days=1), key=\"user:{user_id}\")\nasync def get_user(user_id):\n    return {\"name\": \"Dmitry\", \"surname\": \"Krykov\"}\n```\n\nThen, the returned value was changed to:\n\n```bash\n-    return {\"name\": \"Dmitry\", \"surname\": \"Krykov\"}\n+    return {\"full_name\": \"Dmitry Krykov\"}\n```\n\nSince the function returns a dict, there is no simple way to automatically detect\nthat kind of cache invalidity\n\nOne way to solve the problem is to add a prefix for this cache:\n\n```python\n@cache(ttl=timedelta(days=1), prefix=\"v2\")\nasync def get_user(user_id):\n    return {\"full_name\": \"Dmitry Krykov\"}\n```\n\nbut it is so easy to forget to do it...\n\nThe best defense against this problem is to use your own datacontainers, like\n[dataclasses](https://docs.python.org/3/library/dataclasses.html),\nwith defined `__repr__` method.\nThis will add distinctness and `cashews` can detect changes in such structures automatically\nby checking [object representation](https://docs.python.org/3/reference/datamodel.html#object.__repr__).\n\n```python\nfrom dataclasses import dataclass\n\nfrom cashews import cache\n\ncache.setup(\"mem://\")\n\n@dataclass\nclass User:\n    name: str\n    surname: str\n\n# or define your own class with __repr__ method\n\nclass User:\n\n    def __init__(self, name, surname):\n        self.name, self.surname = name, surname\n\n    def __repr__(self):\n        return f\"{self.name} {self.surname}\"\n\n# Will detect changes of a structure\n@cache(ttl=\"1d\", prefix=\"v2\")\nasync def get_user(user_id):\n    return User(\"Dima\", \"Krykov\")\n```\n\n### Detect the source of a result\n\nDecorators give us a very simple API but also make it difficult to understand where\nthe result is coming from - cache or direct call.\n\nTo solve this problem `cashews` has `detect` context manager:\n\n```python\nfrom cashews import cache\n\nwith cache.detect as detector:\n    response = await something_that_use_cache()\n    calls = detector.calls\n\nprint(calls)\n# >>> {\"my:key\": [{\"ttl\": 10, \"name\": \"simple\", \"backend\": \"redis\"}, ], \"fail:key\": [{\"ttl\": 10, \"exc\": RateLimit}, \"name\": \"fail\", \"backend\": \"mem\"],}\n```\n\nE.g. A simple middleware to use it in a web app:\n\n```python\n@app.middleware(\"http\")\nasync def add_from_cache_headers(request: Request, call_next):\n    with cache.detect as detector:\n        response = await call_next(request)\n        if detector.calls:\n            key = list(detector.calls.keys())[0]\n            response.headers[\"X-From-Cache\"] = key\n            expire = await cache.get_expire(key)\n            response.headers[\"X-From-Cache-Expire-In-Seconds\"] = str(expire)\n    return response\n```\n\n### Middleware\n\nCashews provide the interface for a \"middleware\" pattern:\n\n```python\nimport logging\nfrom cashews import cache\n\nlogger = logging.getLogger(__name__)\n\n\nasync def logging_middleware(call, cmd: Command, backend: Backend, *args, **kwargs):\n    key = args[0] if args else kwargs.get(\"key\", kwargs.get(\"pattern\", \"\"))\n    logger.info(\"=> Cache request: %s \", cmd.value, extra={\"args\": args, \"cache_key\": key})\n    return await call(*args, **kwargs)\n\n\ncache.setup(\"mem://\", middlewares=(logging_middleware, ))\n```\n\n#### Callbacks\n\nOne of the middleware that is preinstalled in cache instance is `CallbackMiddleware`.\nThis middleware also add to a cache a new interface that allow to add a function that will be called before given command will be triggered\n\n```python\nfrom cashews import cache, Command\n\n\ndef callback(key, result):\n  print(f\"GET key={key}\")\n\nwith cache.callback(callback, cmd=Command.GET):\n    await cache.get(\"test\")  # also will print \"GET key=test\"\n\n```\n\n### Transactional\n\nApplications are more often based on a database with transaction (OLTP) usage. Usually cache supports transactions poorly.\nHere is just a simple example of how we can make our cache inconsistent:\n\n```python\nasync def my_handler():\n    async with db.transaction():\n        await db.insert(user)\n        await cache.set(f\"key:{user.id}\", user)\n        await api.service.register(user)\n```\n\nHere the API call may fail, the database transaction will rollback, but the cache will not.\nOf course, in this code, we can solve it by moving the cache call outside transaction, but in real code it may not so easy.\nAnother case: we want to make bulk operations with a group of keys to keep it consistent:\n\n```python\nasync def login(user, token, session):\n    ...\n    old_session = await cache.get(f\"current_session:{user.id}\")\n    await cache.incr(f\"sessions_count:{user.id}\")\n    await cache.set(f\"current_session:{user.id}\", session)\n    await cache.set(f\"token:{token.id}\", user)\n    return old_session\n```\n\nHere we want to have some way to protect our code from race conditions and do operations with cache simultaneously.\n\nCashews support transaction operations:\n\n  > :warning: \\*\\*Warning: transaction operations are `set`, `set_many`, `delete`, `delete_many`, `delete_match` and `incr`\n\n```python\nfrom cashews import cache\n...\n\n@cache.transaction()\nasync def my_handler():\n    async with db.transaction():\n        await db.insert(user)\n        await cache.set(f\"key:{user.id}\", user)\n        await api.service.register(user)\n\n# or\nasync def login(user, token, session):\n    async with cache.transaction() as tx:\n        old_session = await cache.get(f\"current_session:{user.id}\")\n        await cache.incr(f\"sessions_count:{user.id}\")\n        await cache.set(f\"current_session:{user.id}\", session)\n        await cache.set(f\"token:{token.id}\", user)\n        if ...:\n            tx.rollback()\n    return old_session\n\n```\n\nTransactions in cashews support different modes of \"isolation\"\n\n- fast (0-7% overhead) - memory based, can't protect of race conditions, but may use for atomicity\n- locked (default - 4-9% overhead) - use kind of shared lock per cache key (in case of redis or disk backend), protect of race conditions\n- serializable (7-50% overhead) - use global shared lock - one transaction per time (almost useless)\n\n```python\nfrom cashews import cache, TransactionMode\n...\n\n@cache.transaction(TransactionMode.SERIALIZABLE, timeout=1)\nasync def my_handler():\n   ...\n```\n\n### Contrib\n\nThis library is framework agnostic, but includes several \"batteries\" for most popular tools.\n\n#### Fastapi\n\nYou may find a few middlewares useful that can help you to control a cache in you web application based on fastapi.\n\n1. `CacheEtagMiddleware` - middleware add Etag and check 'If-None-Match' header based on Etag\n2. `CacheRequestControlMiddleware` - middleware check and add `Cache-Control` header\n3. `CacheDeleteMiddleware` - clear cache for an endpoint based on `Clear-Site-Data` header\n\n> :warning: \\*\\*Warning: CacheEtagMiddleware requires setting up default cache or cache with prefix \"fastapi:\"\n> ```python\n> from cashews import cache\n> cache.setup(...)\n> # or\n> cache.setup(..., prefix=\"fastapi:\")\n> ```\n\nExample:\n\n```python\nfrom fastapi import FastAPI, Header, Query\nfrom fastapi.responses import StreamingResponse\n\nfrom cashews import cache\nfrom cashews.contrib.fastapi import (\n    CacheDeleteMiddleware,\n    CacheEtagMiddleware,\n    CacheRequestControlMiddleware,\n    cache_control_ttl,\n)\n\napp = FastAPI()\napp.add_middleware(CacheDeleteMiddleware)\napp.add_middleware(CacheEtagMiddleware)\napp.add_middleware(CacheRequestControlMiddleware)\nmetrics_middleware = create_metrics_middleware()\ncache.setup(os.environ.get(\"CACHE_URI\", \"redis://\"))\n\n\n\n@app.get(\"/\")\n@cache.failover(ttl=\"1h\")\n@cache(ttl=cache_control_ttl(default=\"4m\"), key=\"simple:{user_agent:hash}\", time_condition=\"1s\")\nasync def simple(user_agent: str = Header(\"No\")):\n    ...\n\n\n@app.get(\"/stream\")\n@cache(ttl=\"1m\", key=\"stream:{file_path}\")\nasync def stream(file_path: str = Query(__file__)):\n    return StreamingResponse(_read_file(file_path=file_path))\n\n\nasync def _read_file(_read_file):\n    ...\n\n```\n\nAlso cashews can cache stream responses\n\n#### Prometheus\n\nYou can easily provide metrics using the Prometheus middleware.\n\n```python\nfrom cashews import cache\nfrom cashews.contrib.prometheus import create_metrics_middleware\n\nmetrics_middleware = create_metrics_middleware(with_tag=False)\ncache.setup(\"redis://\", middlewares=(metrics_middleware,))\n\n```\n\n## Development\n\n### Setup\n\n- Clone the project.\n- After creating a virtual environment, install [pre-commit](https://pre-commit.com/):\n  ```shell\n  pip install pre-commit && pre-commit install --install-hooks\n  ```\n\n### Tests\n\nTo run tests you can use `tox`:\n\n```shell\npip install tox\ntox -e py  // tests for inmemory backend\ntox -e py-diskcache  // tests for diskcache backend\ntox -e py-redis  // tests for redis backend  - you need to run redis\ntox -e py-integration  // tests for integrations with aiohttp and fastapi\n\ntox // to run all tests for all python that is installed on your machine\n```\n\nOr use `pytest`, but 2 tests always fail, it is OK:\n\n```shell\npip install .[tests,redis,diskcache,speedup] fastapi aiohttp requests httpx SQLAlchemy prometheus-client\n\npytest // run all tests with all backends\npytest -m \"not redis\" // all tests without tests for redis backend\n```\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "cache tools with async power",
    "version": "7.4.0",
    "project_urls": {
        "Homepage": "https://github.com/Krukov/cashews/"
    },
    "split_keywords": [
        "cache",
        "aio",
        "async",
        "multicache",
        "aiocache"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "440c076f12a0b3005498a6afead1aa40894dfb1ccc14ad7858c0a821e89b57bc",
                "md5": "554836a84cf912eba4931709705013c7",
                "sha256": "e881cc9b4be05ac9ce2c448784bca2864776b1c13ee262658d7c0ebf0d3d257a"
            },
            "downloads": -1,
            "filename": "cashews-7.4.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "554836a84cf912eba4931709705013c7",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 78908,
            "upload_time": "2024-11-16T22:13:44",
            "upload_time_iso_8601": "2024-11-16T22:13:44.656322Z",
            "url": "https://files.pythonhosted.org/packages/44/0c/076f12a0b3005498a6afead1aa40894dfb1ccc14ad7858c0a821e89b57bc/cashews-7.4.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "e3c453602c566d0c341b4b80f5a4257e303e4e78ffb95452bded2071e88c758a",
                "md5": "a26af4bd321471a2bca55e1559649330",
                "sha256": "c9d22b9b9da567788f232374a5de3b30ceed1e5c24085c96d304b696df0dcbd8"
            },
            "downloads": -1,
            "filename": "cashews-7.4.0.tar.gz",
            "has_sig": false,
            "md5_digest": "a26af4bd321471a2bca55e1559649330",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 91299,
            "upload_time": "2024-11-16T22:13:46",
            "upload_time_iso_8601": "2024-11-16T22:13:46.874392Z",
            "url": "https://files.pythonhosted.org/packages/e3/c4/53602c566d0c341b4b80f5a4257e303e4e78ffb95452bded2071e88c758a/cashews-7.4.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-11-16 22:13:46",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "Krukov",
    "github_project": "cashews",
    "travis_ci": false,
    "coveralls": true,
    "github_actions": true,
    "tox": true,
    "lcname": "cashews"
}
        
Elapsed time: 0.38954s