eventsourcing-sqlalchemy


Nameeventsourcing-sqlalchemy JSON
Version 0.10 PyPI version JSON
download
home_pagehttps://eventsourcing.readthedocs.io/
SummaryPython package for eventsourcing with SQLAlchemy.
upload_time2024-11-08 15:34:39
maintainerNone
docs_urlNone
authorJohn Bywater
requires_python<4.0,>=3.8
licenseBSD 3-Clause
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Event Sourcing in Python with SQLAlchemy

This package supports using the Python
[eventsourcing](https://github.com/pyeventsourcing/eventsourcing) library
with [SQLAlchemy](https://www.sqlalchemy.org/).

## Table of contents

<!-- TOC -->
* [Table of contents](#table-of-contents)
* [Quick start](#quick-start)
* [Installation](#installation)
* [Getting started](#getting-started)
* [Managing transactions outside the application](#managing-transactions-outside-the-application)
* [Using SQLAlchemy scoped sessions](#using-sqlalchemy-scoped-sessions)
* [Managing sessions with Flask-SQLAlchemy](#managing-sessions-with-flask-sqlalchemy)
* [Managing sessions with FastAPI-SQLAlchemy](#managing-sessions-with-fastapi-sqlalchemy)
* [Google Cloud SQL Python Connector](#google-cloud-sql-python-connector)
* [More information](#more-information)
<!-- TOC -->

## Quick start

To use SQLAlchemy with your Python eventsourcing applications:
* install the Python package `eventsourcing_sqlalchemy`
* set the environment variable `PERSISTENCE_MODULE` to `'eventsourcing_sqlalchemy'`
* set the environment variable `SQLALCHEMY_URL` to an SQLAlchemy database URL

See below for more information.

## Installation

Use pip to install the [stable distribution](https://pypi.org/project/eventsourcing_sqlalchemy/)
from the Python Package Index. Please note, it is recommended to
install Python packages into a Python virtual environment.

    $ pip install eventsourcing_sqlalchemy

## Getting started

Define aggregates and applications in the usual way.

```python
from eventsourcing.application import Application
from eventsourcing.domain import Aggregate, event
from uuid import uuid5, NAMESPACE_URL


class TrainingSchool(Application):
    def register(self, name):
        dog = Dog(name)
        self.save(dog)

    def add_trick(self, name, trick):
        dog = self.repository.get(Dog.create_id(name))
        dog.add_trick(trick)
        self.save(dog)

    def get_tricks(self, name):
        dog = self.repository.get(Dog.create_id(name))
        return dog.tricks


class Dog(Aggregate):
    @event('Registered')
    def __init__(self, name):
        self.name = name
        self.tricks = []

    @staticmethod
    def create_id(name):
        return uuid5(NAMESPACE_URL, f'/dogs/{name}')

    @event('TrickAdded')
    def add_trick(self, trick):
        self.tricks.append(trick)
```

To use this module as the persistence module for your application, set the environment
variable `PERSISTENCE_MODULE` to `'eventsourcing_sqlalchemy'`.

When using this module, you need to set the environment variable `SQLALCHEMY_URL` to an
SQLAlchemy database URL for your database.
Please refer to the [SQLAlchemy documentation](https://docs.sqlalchemy.org/en/14/core/engines.html)
for more information about SQLAlchemy Database URLs.

```python
import os

os.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'
os.environ['SQLALCHEMY_URL'] = 'sqlite:///:memory:'
```

Construct and use the application in the usual way.

```python
school = TrainingSchool()
school.register('Fido')
school.add_trick('Fido', 'roll over')
school.add_trick('Fido', 'play dead')
tricks = school.get_tricks('Fido')
assert tricks == ['roll over', 'play dead']
```

## Managing transactions outside the application

Sometimes you might need to update an SQLAlchemy ORM model atomically with updates to
your event-sourced application. You can manage transactions outside the application.
Just call the application recorder's `transaction()` method and use the returned
`Transaction` object as a context manager to obtain an SQLAlchemy `Session`
object. You can `add()` your ORM objects to the session. Everything will commit
atomically when the `Transaction` context manager exits. This effectively implements
thread-scoped transactions.

```python
with school.recorder.transaction() as session:
    # Update CRUD model.
    ...  # session.add(my_orm_object)
    # Update event-sourced application.
    school.register('Buster')
    school.add_trick('Buster', 'fetch ball')

    tricks = school.get_tricks('Buster')
    assert tricks == ['fetch ball']
```

Please note, the SQLAlchemy "autoflush" ORM feature is enabled by default.

```python
app = Application()

with app.recorder.transaction() as session:
    assert session.autoflush is True
```

If you need "autoflush" to be disabled, you can set the environment variable `SQLALCHEMY_NO_AUTOFLUSH`.

```python
app = Application(env={'SQLALCHEMY_AUTOFLUSH': 'False'})

with app.recorder.transaction() as session:
    assert session.autoflush is False
```

Alternatively, you can set the autoflush option directly on the SQLAlchemy session maker.

```python
app = Application()
app.recorder.datastore.session_maker.kw["autoflush"] = False

with app.recorder.transaction() as session:
    assert session.autoflush is False
```

Alternatively, you can use the session's ``no_autoflush`` context manager.

```python
app = Application()

with app.recorder.transaction() as session:
    with session.no_autoflush:
        assert session.autoflush is False
```

Alternatively, you can set the ``autoflush`` attribute of the session object.

```python
app = Application()

with app.recorder.transaction() as session:
    session.autoflush = False
    # Add CRUD objects to the session.
    ...
```

## Using SQLAlchemy scoped sessions

It's possible to configure the application to use an SQLAlchemy `scoped_session`
object which will scope the session to standard threads, or other things such
as Web requests in a Web application framework.

Define an adapter for a `scoped_session` object and configure the event-sourced
application using the environment variable `SQLALCHEMY_SCOPED_SESSION_TOPIC`.

```python
from eventsourcing.application import AggregateNotFound
from eventsourcing.utils import get_topic
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

# Create engine.
engine = create_engine('sqlite:///:memory:')

# Create a scoped_session object.
session = scoped_session(
    sessionmaker(autocommit=False, autoflush=False, bind=engine)
)

# Define an adapter for the scoped session.
class MyScopedSessionAdapter:
    def __getattribute__(self, item: str) -> None:
        return getattr(session, item)

# Produce the topic of the scoped session adapter class.
scoped_session_topic = get_topic(MyScopedSessionAdapter)

# Construct an event-sourced application.
app = Application(
    env={'SQLALCHEMY_SCOPED_SESSION_TOPIC': scoped_session_topic}
)

# During request.
aggregate = Aggregate()
app.save(aggregate)
app.repository.get(aggregate.id)
session.commit()

# After request.
session.remove()

# During request.
app.repository.get(aggregate.id)

# After request.
session.remove()

# During request.
aggregate = Aggregate()
app.save(aggregate)
# forget to commit

# After request.
session.remove()

# During request.
try:
    # forgot to commit
    app.repository.get(aggregate.id)
except AggregateNotFound:
    pass
else:
    raise Exception("Expected aggregate not found")

# After request.
session.remove()
```

As you can see, you need to call `commit()` during a request, and call `remove()`
after the request completes. Packages that integrate SQLAlchemy with Web application
frameworks tend to automate this call to `remove()`. Some of them also call `commit()`
automatically if an exception is not raised during the handling of a request.

## Managing sessions with Flask-SQLAlchemy

The package [Flask-SQLAlchemy](https://github.com/pallets-eco/flask-sqlalchemy)
([full docs](https://flask-sqlalchemy.palletsprojects.com/)) provides a class
called `SQLAlchemy` which has a `session` attribute which is an SQLAlchemy
`scoped_session`. This can be adapted in a similar way.

```python
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
try:
    from sqlalchemy.orm import declarative_base  # type: ignore
except ImportError:
    from sqlalchemy.ext.declarative import declarative_base

# Define a Flask app.
flask_app = Flask(__name__)
flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'

# Integration between Flask and SQLAlchemy.
Base = declarative_base()
db = SQLAlchemy(flask_app, model_class=Base)


# Define an adapter for the scoped session.
class FlaskScopedSession:
    def __getattribute__(self, item: str) -> None:
        return getattr(db.session, item)


# Run the Flask application in a Web application server.
with flask_app.app_context():

    # Produce the topic of the scoped session adapter class.
    scoped_session_adapter_topic = get_topic(FlaskScopedSession)
    # Construct event-sourced application to use scoped sessions.
    es_app = Application(
        env={"SQLALCHEMY_SCOPED_SESSION_TOPIC": scoped_session_adapter_topic}
    )

    # During request.
    aggregate = Aggregate()
    es_app.save(aggregate)
    db.session.commit()

    # After request (this is done automatically).
    db.session.remove()

    # During request.
    es_app.repository.get(aggregate.id)

    # After request (this is done automatically).
    db.session.remove()
```

## Managing sessions with FastAPI-SQLAlchemy

The package [FastAPI-SQLAlchemy](https://github.com/mfreeborn/fastapi-sqlalchemy)
doesn't actually use an SQLAlchemy `scoped_session`, but instead has a global `db`
variable that has a `session` attribute which returns request-scoped sessions when
accessed. This can be adapted in a similar way. Sessions are committed automatically
after the request has been handled successfully, and not committed if an exception
is raised.

```python
from fastapi import FastAPI
from fastapi_sqlalchemy import db, DBSessionMiddleware

# Define a FastAPI application.
fastapi_app = FastAPI()

# Add SQLAlchemy integration middleware to the FastAPI application.
fastapi_app.add_middleware(
    DBSessionMiddleware, db_url='sqlite:///:memory:'
)

# Build the middleware stack (happens automatically when the FastAPI app runs in a Web app server).
fastapi_app.build_middleware_stack()

# Define an adapter for the scoped session.
class FastapiScopedSession:
    def __getattribute__(self, item: str) -> None:
        return getattr(db.session, item)

# Construct an event-sourced application within a scoped session.
with db(commit_on_exit=True):
    # Produce the topic of the scoped session adapter class.
    scoped_session_adapter_topic = get_topic(FlaskScopedSession)
    # Construct event-sourced application to use scoped sessions.
    es_app = Application(
        env={"SQLALCHEMY_SCOPED_SESSION_TOPIC": get_topic(FastapiScopedSession)}
    )

# Create a new event-sourced aggregate.
with db(commit_on_exit=True):  # This happens automatically before handling a route.
    # Handle request.
    aggregate = Aggregate()
    es_app.save(aggregate)
    es_app.repository.get(aggregate.id)

# The aggregate has been committed.
with db(commit_on_exit=True):  # This happens automatically before handling a route.
    # Handle request.
    es_app.repository.get(aggregate.id)

# Raise exception after creating aggregate.
try:
    with db(commit_on_exit=True):
        # Handle request.
        aggregate = Aggregate()
        es_app.save(aggregate)
        es_app.repository.get(aggregate.id)
        raise TypeError("An error occurred!!!")
except TypeError:
    # Web framework returns an error.
    ...
else:
    raise Exception("Expected type error")

# The aggregate hasn't been committed.
with db(commit_on_exit=True):
    try:
        es_app.repository.get(aggregate.id)
    except AggregateNotFound:
        pass
    else:
        raise Exception("Expected aggregate not found")
```



## Google Cloud SQL Python Connector

You can set the environment variable `SQLALCHEMY_CONNECTION_CREATOR_TOPIC` to a topic
that will resolve to a callable that will be used to create database connections.

For example, you can use the [Cloud SQL Python Connector](https://pypi.org/project/cloud-sql-python-connector/)
in the following way.

First install the Cloud SQL Python Connector package from PyPI.

    $ pip install 'cloud-sql-python-connector[pg8000]'

Then define a `getconn()` function, following the advice in the Cloud SQL
Python Connector README page.

```python
from google.cloud.sql.connector import Connector

# initialize Connector object
connector = Connector()

# function to return the database connection
def get_google_cloud_sql_conn():
    return connector.connect(
        "project:region:instance",
        "pg8000",
        user="postgres-iam-user@gmail.com",
        db="my-db-name",
        enable_iam_auth=True,
   )
```

Set the environment variable `'SQLALCHEMY_CONNECTION_CREATOR_TOPIC'`, along with
`'PERSISTENCE_MODULE'` and `'SQLALCHEMY_URL'`.

```python
from eventsourcing.utils import get_topic

os.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'
os.environ['SQLALCHEMY_URL'] = 'postgresql+pg8000://'
os.environ['SQLALCHEMY_CONNECTION_CREATOR_TOPIC'] = get_topic(get_google_cloud_sql_conn)
```

## More information

See the library's [documentation](https://eventsourcing.readthedocs.io/)
and the [SQLAlchemy](https://www.sqlalchemy.org/) project for more information.

            

Raw data

            {
    "_id": null,
    "home_page": "https://eventsourcing.readthedocs.io/",
    "name": "eventsourcing-sqlalchemy",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.8",
    "maintainer_email": null,
    "keywords": null,
    "author": "John Bywater",
    "author_email": "john.bywater@appropriatesoftware.net",
    "download_url": "https://files.pythonhosted.org/packages/47/81/36c49d2db0d94d79a31eb9aea48a94647385868bfb00dad5216a1c8cc116/eventsourcing_sqlalchemy-0.10.tar.gz",
    "platform": null,
    "description": "# Event Sourcing in Python with SQLAlchemy\n\nThis package supports using the Python\n[eventsourcing](https://github.com/pyeventsourcing/eventsourcing) library\nwith [SQLAlchemy](https://www.sqlalchemy.org/).\n\n## Table of contents\n\n<!-- TOC -->\n* [Table of contents](#table-of-contents)\n* [Quick start](#quick-start)\n* [Installation](#installation)\n* [Getting started](#getting-started)\n* [Managing transactions outside the application](#managing-transactions-outside-the-application)\n* [Using SQLAlchemy scoped sessions](#using-sqlalchemy-scoped-sessions)\n* [Managing sessions with Flask-SQLAlchemy](#managing-sessions-with-flask-sqlalchemy)\n* [Managing sessions with FastAPI-SQLAlchemy](#managing-sessions-with-fastapi-sqlalchemy)\n* [Google Cloud SQL Python Connector](#google-cloud-sql-python-connector)\n* [More information](#more-information)\n<!-- TOC -->\n\n## Quick start\n\nTo use SQLAlchemy with your Python eventsourcing applications:\n* install the Python package `eventsourcing_sqlalchemy`\n* set the environment variable `PERSISTENCE_MODULE` to `'eventsourcing_sqlalchemy'`\n* set the environment variable `SQLALCHEMY_URL` to an SQLAlchemy database URL\n\nSee below for more information.\n\n## Installation\n\nUse pip to install the [stable distribution](https://pypi.org/project/eventsourcing_sqlalchemy/)\nfrom the Python Package Index. Please note, it is recommended to\ninstall Python packages into a Python virtual environment.\n\n    $ pip install eventsourcing_sqlalchemy\n\n## Getting started\n\nDefine aggregates and applications in the usual way.\n\n```python\nfrom eventsourcing.application import Application\nfrom eventsourcing.domain import Aggregate, event\nfrom uuid import uuid5, NAMESPACE_URL\n\n\nclass TrainingSchool(Application):\n    def register(self, name):\n        dog = Dog(name)\n        self.save(dog)\n\n    def add_trick(self, name, trick):\n        dog = self.repository.get(Dog.create_id(name))\n        dog.add_trick(trick)\n        self.save(dog)\n\n    def get_tricks(self, name):\n        dog = self.repository.get(Dog.create_id(name))\n        return dog.tricks\n\n\nclass Dog(Aggregate):\n    @event('Registered')\n    def __init__(self, name):\n        self.name = name\n        self.tricks = []\n\n    @staticmethod\n    def create_id(name):\n        return uuid5(NAMESPACE_URL, f'/dogs/{name}')\n\n    @event('TrickAdded')\n    def add_trick(self, trick):\n        self.tricks.append(trick)\n```\n\nTo use this module as the persistence module for your application, set the environment\nvariable `PERSISTENCE_MODULE` to `'eventsourcing_sqlalchemy'`.\n\nWhen using this module, you need to set the environment variable `SQLALCHEMY_URL` to an\nSQLAlchemy database URL for your database.\nPlease refer to the [SQLAlchemy documentation](https://docs.sqlalchemy.org/en/14/core/engines.html)\nfor more information about SQLAlchemy Database URLs.\n\n```python\nimport os\n\nos.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'\nos.environ['SQLALCHEMY_URL'] = 'sqlite:///:memory:'\n```\n\nConstruct and use the application in the usual way.\n\n```python\nschool = TrainingSchool()\nschool.register('Fido')\nschool.add_trick('Fido', 'roll over')\nschool.add_trick('Fido', 'play dead')\ntricks = school.get_tricks('Fido')\nassert tricks == ['roll over', 'play dead']\n```\n\n## Managing transactions outside the application\n\nSometimes you might need to update an SQLAlchemy ORM model atomically with updates to\nyour event-sourced application. You can manage transactions outside the application.\nJust call the application recorder's `transaction()` method and use the returned\n`Transaction` object as a context manager to obtain an SQLAlchemy `Session`\nobject. You can `add()` your ORM objects to the session. Everything will commit\natomically when the `Transaction` context manager exits. This effectively implements\nthread-scoped transactions.\n\n```python\nwith school.recorder.transaction() as session:\n    # Update CRUD model.\n    ...  # session.add(my_orm_object)\n    # Update event-sourced application.\n    school.register('Buster')\n    school.add_trick('Buster', 'fetch ball')\n\n    tricks = school.get_tricks('Buster')\n    assert tricks == ['fetch ball']\n```\n\nPlease note, the SQLAlchemy \"autoflush\" ORM feature is enabled by default.\n\n```python\napp = Application()\n\nwith app.recorder.transaction() as session:\n    assert session.autoflush is True\n```\n\nIf you need \"autoflush\" to be disabled, you can set the environment variable `SQLALCHEMY_NO_AUTOFLUSH`.\n\n```python\napp = Application(env={'SQLALCHEMY_AUTOFLUSH': 'False'})\n\nwith app.recorder.transaction() as session:\n    assert session.autoflush is False\n```\n\nAlternatively, you can set the autoflush option directly on the SQLAlchemy session maker.\n\n```python\napp = Application()\napp.recorder.datastore.session_maker.kw[\"autoflush\"] = False\n\nwith app.recorder.transaction() as session:\n    assert session.autoflush is False\n```\n\nAlternatively, you can use the session's ``no_autoflush`` context manager.\n\n```python\napp = Application()\n\nwith app.recorder.transaction() as session:\n    with session.no_autoflush:\n        assert session.autoflush is False\n```\n\nAlternatively, you can set the ``autoflush`` attribute of the session object.\n\n```python\napp = Application()\n\nwith app.recorder.transaction() as session:\n    session.autoflush = False\n    # Add CRUD objects to the session.\n    ...\n```\n\n## Using SQLAlchemy scoped sessions\n\nIt's possible to configure the application to use an SQLAlchemy `scoped_session`\nobject which will scope the session to standard threads, or other things such\nas Web requests in a Web application framework.\n\nDefine an adapter for a `scoped_session` object and configure the event-sourced\napplication using the environment variable `SQLALCHEMY_SCOPED_SESSION_TOPIC`.\n\n```python\nfrom eventsourcing.application import AggregateNotFound\nfrom eventsourcing.utils import get_topic\nfrom sqlalchemy import create_engine\nfrom sqlalchemy.orm import sessionmaker, scoped_session\n\n# Create engine.\nengine = create_engine('sqlite:///:memory:')\n\n# Create a scoped_session object.\nsession = scoped_session(\n    sessionmaker(autocommit=False, autoflush=False, bind=engine)\n)\n\n# Define an adapter for the scoped session.\nclass MyScopedSessionAdapter:\n    def __getattribute__(self, item: str) -> None:\n        return getattr(session, item)\n\n# Produce the topic of the scoped session adapter class.\nscoped_session_topic = get_topic(MyScopedSessionAdapter)\n\n# Construct an event-sourced application.\napp = Application(\n    env={'SQLALCHEMY_SCOPED_SESSION_TOPIC': scoped_session_topic}\n)\n\n# During request.\naggregate = Aggregate()\napp.save(aggregate)\napp.repository.get(aggregate.id)\nsession.commit()\n\n# After request.\nsession.remove()\n\n# During request.\napp.repository.get(aggregate.id)\n\n# After request.\nsession.remove()\n\n# During request.\naggregate = Aggregate()\napp.save(aggregate)\n# forget to commit\n\n# After request.\nsession.remove()\n\n# During request.\ntry:\n    # forgot to commit\n    app.repository.get(aggregate.id)\nexcept AggregateNotFound:\n    pass\nelse:\n    raise Exception(\"Expected aggregate not found\")\n\n# After request.\nsession.remove()\n```\n\nAs you can see, you need to call `commit()` during a request, and call `remove()`\nafter the request completes. Packages that integrate SQLAlchemy with Web application\nframeworks tend to automate this call to `remove()`. Some of them also call `commit()`\nautomatically if an exception is not raised during the handling of a request.\n\n## Managing sessions with Flask-SQLAlchemy\n\nThe package [Flask-SQLAlchemy](https://github.com/pallets-eco/flask-sqlalchemy)\n([full docs](https://flask-sqlalchemy.palletsprojects.com/)) provides a class\ncalled `SQLAlchemy` which has a `session` attribute which is an SQLAlchemy\n`scoped_session`. This can be adapted in a similar way.\n\n```python\nfrom flask import Flask\nfrom flask_sqlalchemy import SQLAlchemy\ntry:\n    from sqlalchemy.orm import declarative_base  # type: ignore\nexcept ImportError:\n    from sqlalchemy.ext.declarative import declarative_base\n\n# Define a Flask app.\nflask_app = Flask(__name__)\nflask_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'\n\n# Integration between Flask and SQLAlchemy.\nBase = declarative_base()\ndb = SQLAlchemy(flask_app, model_class=Base)\n\n\n# Define an adapter for the scoped session.\nclass FlaskScopedSession:\n    def __getattribute__(self, item: str) -> None:\n        return getattr(db.session, item)\n\n\n# Run the Flask application in a Web application server.\nwith flask_app.app_context():\n\n    # Produce the topic of the scoped session adapter class.\n    scoped_session_adapter_topic = get_topic(FlaskScopedSession)\n    # Construct event-sourced application to use scoped sessions.\n    es_app = Application(\n        env={\"SQLALCHEMY_SCOPED_SESSION_TOPIC\": scoped_session_adapter_topic}\n    )\n\n    # During request.\n    aggregate = Aggregate()\n    es_app.save(aggregate)\n    db.session.commit()\n\n    # After request (this is done automatically).\n    db.session.remove()\n\n    # During request.\n    es_app.repository.get(aggregate.id)\n\n    # After request (this is done automatically).\n    db.session.remove()\n```\n\n## Managing sessions with FastAPI-SQLAlchemy\n\nThe package [FastAPI-SQLAlchemy](https://github.com/mfreeborn/fastapi-sqlalchemy)\ndoesn't actually use an SQLAlchemy `scoped_session`, but instead has a global `db`\nvariable that has a `session` attribute which returns request-scoped sessions when\naccessed. This can be adapted in a similar way. Sessions are committed automatically\nafter the request has been handled successfully, and not committed if an exception\nis raised.\n\n```python\nfrom fastapi import FastAPI\nfrom fastapi_sqlalchemy import db, DBSessionMiddleware\n\n# Define a FastAPI application.\nfastapi_app = FastAPI()\n\n# Add SQLAlchemy integration middleware to the FastAPI application.\nfastapi_app.add_middleware(\n    DBSessionMiddleware, db_url='sqlite:///:memory:'\n)\n\n# Build the middleware stack (happens automatically when the FastAPI app runs in a Web app server).\nfastapi_app.build_middleware_stack()\n\n# Define an adapter for the scoped session.\nclass FastapiScopedSession:\n    def __getattribute__(self, item: str) -> None:\n        return getattr(db.session, item)\n\n# Construct an event-sourced application within a scoped session.\nwith db(commit_on_exit=True):\n    # Produce the topic of the scoped session adapter class.\n    scoped_session_adapter_topic = get_topic(FlaskScopedSession)\n    # Construct event-sourced application to use scoped sessions.\n    es_app = Application(\n        env={\"SQLALCHEMY_SCOPED_SESSION_TOPIC\": get_topic(FastapiScopedSession)}\n    )\n\n# Create a new event-sourced aggregate.\nwith db(commit_on_exit=True):  # This happens automatically before handling a route.\n    # Handle request.\n    aggregate = Aggregate()\n    es_app.save(aggregate)\n    es_app.repository.get(aggregate.id)\n\n# The aggregate has been committed.\nwith db(commit_on_exit=True):  # This happens automatically before handling a route.\n    # Handle request.\n    es_app.repository.get(aggregate.id)\n\n# Raise exception after creating aggregate.\ntry:\n    with db(commit_on_exit=True):\n        # Handle request.\n        aggregate = Aggregate()\n        es_app.save(aggregate)\n        es_app.repository.get(aggregate.id)\n        raise TypeError(\"An error occurred!!!\")\nexcept TypeError:\n    # Web framework returns an error.\n    ...\nelse:\n    raise Exception(\"Expected type error\")\n\n# The aggregate hasn't been committed.\nwith db(commit_on_exit=True):\n    try:\n        es_app.repository.get(aggregate.id)\n    except AggregateNotFound:\n        pass\n    else:\n        raise Exception(\"Expected aggregate not found\")\n```\n\n\n\n## Google Cloud SQL Python Connector\n\nYou can set the environment variable `SQLALCHEMY_CONNECTION_CREATOR_TOPIC` to a topic\nthat will resolve to a callable that will be used to create database connections.\n\nFor example, you can use the [Cloud SQL Python Connector](https://pypi.org/project/cloud-sql-python-connector/)\nin the following way.\n\nFirst install the Cloud SQL Python Connector package from PyPI.\n\n    $ pip install 'cloud-sql-python-connector[pg8000]'\n\nThen define a `getconn()` function, following the advice in the Cloud SQL\nPython Connector README page.\n\n```python\nfrom google.cloud.sql.connector import Connector\n\n# initialize Connector object\nconnector = Connector()\n\n# function to return the database connection\ndef get_google_cloud_sql_conn():\n    return connector.connect(\n        \"project:region:instance\",\n        \"pg8000\",\n        user=\"postgres-iam-user@gmail.com\",\n        db=\"my-db-name\",\n        enable_iam_auth=True,\n   )\n```\n\nSet the environment variable `'SQLALCHEMY_CONNECTION_CREATOR_TOPIC'`, along with\n`'PERSISTENCE_MODULE'` and `'SQLALCHEMY_URL'`.\n\n```python\nfrom eventsourcing.utils import get_topic\n\nos.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'\nos.environ['SQLALCHEMY_URL'] = 'postgresql+pg8000://'\nos.environ['SQLALCHEMY_CONNECTION_CREATOR_TOPIC'] = get_topic(get_google_cloud_sql_conn)\n```\n\n## More information\n\nSee the library's [documentation](https://eventsourcing.readthedocs.io/)\nand the [SQLAlchemy](https://www.sqlalchemy.org/) project for more information.\n",
    "bugtrack_url": null,
    "license": "BSD 3-Clause",
    "summary": "Python package for eventsourcing with SQLAlchemy.",
    "version": "0.10",
    "project_urls": {
        "Homepage": "https://eventsourcing.readthedocs.io/",
        "Repository": "https://github.com/pyeventsourcing/eventsourcing-sqlalchemy"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "db143ce5ffed52f8836f781d8a10cd20c06f849903e79716e2a475521884de2c",
                "md5": "65901cbf2d9e8336538db152503fbc55",
                "sha256": "460121469a74f8825de4d49c893b57e56a7ac5c189568d3e1f2fe90b84078f3c"
            },
            "downloads": -1,
            "filename": "eventsourcing_sqlalchemy-0.10-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "65901cbf2d9e8336538db152503fbc55",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.8",
            "size": 13108,
            "upload_time": "2024-11-08T15:34:37",
            "upload_time_iso_8601": "2024-11-08T15:34:37.205288Z",
            "url": "https://files.pythonhosted.org/packages/db/14/3ce5ffed52f8836f781d8a10cd20c06f849903e79716e2a475521884de2c/eventsourcing_sqlalchemy-0.10-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "478136c49d2db0d94d79a31eb9aea48a94647385868bfb00dad5216a1c8cc116",
                "md5": "b1cce7be5f57cfea3bb41d4def710bf5",
                "sha256": "63ad18d8184f3732010bef8ee556e6bfa6b18ee5758687b86f32cf35807c2ea5"
            },
            "downloads": -1,
            "filename": "eventsourcing_sqlalchemy-0.10.tar.gz",
            "has_sig": false,
            "md5_digest": "b1cce7be5f57cfea3bb41d4def710bf5",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.8",
            "size": 15145,
            "upload_time": "2024-11-08T15:34:39",
            "upload_time_iso_8601": "2024-11-08T15:34:39.474612Z",
            "url": "https://files.pythonhosted.org/packages/47/81/36c49d2db0d94d79a31eb9aea48a94647385868bfb00dad5216a1c8cc116/eventsourcing_sqlalchemy-0.10.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-11-08 15:34:39",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "pyeventsourcing",
    "github_project": "eventsourcing-sqlalchemy",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "eventsourcing-sqlalchemy"
}
        
Elapsed time: 0.33461s