# Event Sourcing in Python with SQLAlchemy
This package supports using the Python
[eventsourcing](https://github.com/pyeventsourcing/eventsourcing) library
with [SQLAlchemy](https://www.sqlalchemy.org/).
## Table of contents
<!-- TOC -->
* [Table of contents](#table-of-contents)
* [Quick start](#quick-start)
* [Installation](#installation)
* [Getting started](#getting-started)
* [Managing transactions outside the application](#managing-transactions-outside-the-application)
* [Using SQLAlchemy scoped sessions](#using-sqlalchemy-scoped-sessions)
* [Managing sessions with Flask-SQLAlchemy](#managing-sessions-with-flask-sqlalchemy)
* [Managing sessions with FastAPI-SQLAlchemy](#managing-sessions-with-fastapi-sqlalchemy)
* [Google Cloud SQL Python Connector](#google-cloud-sql-python-connector)
* [More information](#more-information)
<!-- TOC -->
## Quick start
To use SQLAlchemy with your Python eventsourcing applications:
* install the Python package `eventsourcing_sqlalchemy`
* set the environment variable `PERSISTENCE_MODULE` to `'eventsourcing_sqlalchemy'`
* set the environment variable `SQLALCHEMY_URL` to an SQLAlchemy database URL
See below for more information.
## Installation
Use pip to install the [stable distribution](https://pypi.org/project/eventsourcing_sqlalchemy/)
from the Python Package Index. Please note, it is recommended to
install Python packages into a Python virtual environment.
    $ pip install eventsourcing_sqlalchemy
## Getting started
Define aggregates and applications in the usual way.
```python
from eventsourcing.application import Application
from eventsourcing.domain import Aggregate, event
from uuid import uuid5, NAMESPACE_URL
class TrainingSchool(Application):
    def register(self, name):
        dog = Dog(name)
        self.save(dog)
    def add_trick(self, name, trick):
        dog = self.repository.get(Dog.create_id(name))
        dog.add_trick(trick)
        self.save(dog)
    def get_tricks(self, name):
        dog = self.repository.get(Dog.create_id(name))
        return dog.tricks
class Dog(Aggregate):
    @event('Registered')
    def __init__(self, name):
        self.name = name
        self.tricks = []
    @staticmethod
    def create_id(name):
        return uuid5(NAMESPACE_URL, f'/dogs/{name}')
    @event('TrickAdded')
    def add_trick(self, trick):
        self.tricks.append(trick)
```
To use this module as the persistence module for your application, set the environment
variable `PERSISTENCE_MODULE` to `'eventsourcing_sqlalchemy'`.
When using this module, you need to set the environment variable `SQLALCHEMY_URL` to an
SQLAlchemy database URL for your database.
Please refer to the [SQLAlchemy documentation](https://docs.sqlalchemy.org/en/14/core/engines.html)
for more information about SQLAlchemy Database URLs.
```python
import os
os.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'
os.environ['SQLALCHEMY_URL'] = 'sqlite:///:memory:'
```
Construct and use the application in the usual way.
```python
school = TrainingSchool()
school.register('Fido')
school.add_trick('Fido', 'roll over')
school.add_trick('Fido', 'play dead')
tricks = school.get_tricks('Fido')
assert tricks == ['roll over', 'play dead']
```
## Managing transactions outside the application
Sometimes you might need to update an SQLAlchemy ORM model atomically with updates to
your event-sourced application. You can manage transactions outside the application.
Just call the application recorder's `transaction()` method and use the returned
`Transaction` object as a context manager to obtain an SQLAlchemy `Session`
object. You can `add()` your ORM objects to the session. Everything will commit
atomically when the `Transaction` context manager exits. This effectively implements
thread-scoped transactions.
```python
with school.recorder.transaction() as session:
    # Update CRUD model.
    ...  # session.add(my_orm_object)
    # Update event-sourced application.
    school.register('Buster')
    school.add_trick('Buster', 'fetch ball')
    tricks = school.get_tricks('Buster')
    assert tricks == ['fetch ball']
```
Please note, the SQLAlchemy "autoflush" ORM feature is enabled by default.
```python
app = Application()
with app.recorder.transaction() as session:
    assert session.autoflush is True
```
If you need "autoflush" to be disabled, you can set the environment variable `SQLALCHEMY_NO_AUTOFLUSH`.
```python
app = Application(env={'SQLALCHEMY_AUTOFLUSH': 'False'})
with app.recorder.transaction() as session:
    assert session.autoflush is False
```
Alternatively, you can set the autoflush option directly on the SQLAlchemy session maker.
```python
app = Application()
app.recorder.datastore.session_maker.kw["autoflush"] = False
with app.recorder.transaction() as session:
    assert session.autoflush is False
```
Alternatively, you can use the session's ``no_autoflush`` context manager.
```python
app = Application()
with app.recorder.transaction() as session:
    with session.no_autoflush:
        assert session.autoflush is False
```
Alternatively, you can set the ``autoflush`` attribute of the session object.
```python
app = Application()
with app.recorder.transaction() as session:
    session.autoflush = False
    # Add CRUD objects to the session.
    ...
```
## Using SQLAlchemy scoped sessions
It's possible to configure the application to use an SQLAlchemy `scoped_session`
object which will scope the session to standard threads, or other things such
as Web requests in a Web application framework.
Define an adapter for a `scoped_session` object and configure the event-sourced
application using the environment variable `SQLALCHEMY_SCOPED_SESSION_TOPIC`.
```python
from eventsourcing.application import AggregateNotFound
from eventsourcing.utils import get_topic
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
# Create engine.
engine = create_engine('sqlite:///:memory:')
# Create a scoped_session object.
session = scoped_session(
    sessionmaker(autocommit=False, autoflush=False, bind=engine)
)
# Define an adapter for the scoped session.
class MyScopedSessionAdapter:
    def __getattribute__(self, item: str) -> None:
        return getattr(session, item)
# Produce the topic of the scoped session adapter class.
scoped_session_topic = get_topic(MyScopedSessionAdapter)
# Construct an event-sourced application.
app = Application(
    env={'SQLALCHEMY_SCOPED_SESSION_TOPIC': scoped_session_topic}
)
# During request.
aggregate = Aggregate()
app.save(aggregate)
app.repository.get(aggregate.id)
session.commit()
# After request.
session.remove()
# During request.
app.repository.get(aggregate.id)
# After request.
session.remove()
# During request.
aggregate = Aggregate()
app.save(aggregate)
# forget to commit
# After request.
session.remove()
# During request.
try:
    # forgot to commit
    app.repository.get(aggregate.id)
except AggregateNotFound:
    pass
else:
    raise Exception("Expected aggregate not found")
# After request.
session.remove()
```
As you can see, you need to call `commit()` during a request, and call `remove()`
after the request completes. Packages that integrate SQLAlchemy with Web application
frameworks tend to automate this call to `remove()`. Some of them also call `commit()`
automatically if an exception is not raised during the handling of a request.
## Managing sessions with Flask-SQLAlchemy
The package [Flask-SQLAlchemy](https://github.com/pallets-eco/flask-sqlalchemy)
([full docs](https://flask-sqlalchemy.palletsprojects.com/)) provides a class
called `SQLAlchemy` which has a `session` attribute which is an SQLAlchemy
`scoped_session`. This can be adapted in a similar way.
```python
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
try:
    from sqlalchemy.orm import declarative_base  # type: ignore
except ImportError:
    from sqlalchemy.ext.declarative import declarative_base
# Define a Flask app.
flask_app = Flask(__name__)
flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
# Integration between Flask and SQLAlchemy.
Base = declarative_base()
db = SQLAlchemy(flask_app, model_class=Base)
# Define an adapter for the scoped session.
class FlaskScopedSession:
    def __getattribute__(self, item: str) -> None:
        return getattr(db.session, item)
# Run the Flask application in a Web application server.
with flask_app.app_context():
    # Produce the topic of the scoped session adapter class.
    scoped_session_adapter_topic = get_topic(FlaskScopedSession)
    # Construct event-sourced application to use scoped sessions.
    es_app = Application(
        env={"SQLALCHEMY_SCOPED_SESSION_TOPIC": scoped_session_adapter_topic}
    )
    # During request.
    aggregate = Aggregate()
    es_app.save(aggregate)
    db.session.commit()
    # After request (this is done automatically).
    db.session.remove()
    # During request.
    es_app.repository.get(aggregate.id)
    # After request (this is done automatically).
    db.session.remove()
```
## Managing sessions with FastAPI-SQLAlchemy
The package [FastAPI-SQLAlchemy](https://github.com/mfreeborn/fastapi-sqlalchemy)
doesn't actually use an SQLAlchemy `scoped_session`, but instead has a global `db`
variable that has a `session` attribute which returns request-scoped sessions when
accessed. This can be adapted in a similar way. Sessions are committed automatically
after the request has been handled successfully, and not committed if an exception
is raised.
```python
from fastapi import FastAPI
from fastapi_sqlalchemy import db, DBSessionMiddleware
# Define a FastAPI application.
fastapi_app = FastAPI()
# Add SQLAlchemy integration middleware to the FastAPI application.
fastapi_app.add_middleware(
    DBSessionMiddleware, db_url='sqlite:///:memory:'
)
# Build the middleware stack (happens automatically when the FastAPI app runs in a Web app server).
fastapi_app.build_middleware_stack()
# Define an adapter for the scoped session.
class FastapiScopedSession:
    def __getattribute__(self, item: str) -> None:
        return getattr(db.session, item)
# Construct an event-sourced application within a scoped session.
with db(commit_on_exit=True):
    # Produce the topic of the scoped session adapter class.
    scoped_session_adapter_topic = get_topic(FlaskScopedSession)
    # Construct event-sourced application to use scoped sessions.
    es_app = Application(
        env={"SQLALCHEMY_SCOPED_SESSION_TOPIC": get_topic(FastapiScopedSession)}
    )
# Create a new event-sourced aggregate.
with db(commit_on_exit=True):  # This happens automatically before handling a route.
    # Handle request.
    aggregate = Aggregate()
    es_app.save(aggregate)
    es_app.repository.get(aggregate.id)
# The aggregate has been committed.
with db(commit_on_exit=True):  # This happens automatically before handling a route.
    # Handle request.
    es_app.repository.get(aggregate.id)
# Raise exception after creating aggregate.
try:
    with db(commit_on_exit=True):
        # Handle request.
        aggregate = Aggregate()
        es_app.save(aggregate)
        es_app.repository.get(aggregate.id)
        raise TypeError("An error occurred!!!")
except TypeError:
    # Web framework returns an error.
    ...
else:
    raise Exception("Expected type error")
# The aggregate hasn't been committed.
with db(commit_on_exit=True):
    try:
        es_app.repository.get(aggregate.id)
    except AggregateNotFound:
        pass
    else:
        raise Exception("Expected aggregate not found")
```
## Google Cloud SQL Python Connector
You can set the environment variable `SQLALCHEMY_CONNECTION_CREATOR_TOPIC` to a topic
that will resolve to a callable that will be used to create database connections.
For example, you can use the [Cloud SQL Python Connector](https://pypi.org/project/cloud-sql-python-connector/)
in the following way.
First install the Cloud SQL Python Connector package from PyPI.
    $ pip install 'cloud-sql-python-connector[pg8000]'
Then define a `getconn()` function, following the advice in the Cloud SQL
Python Connector README page.
```python
from google.cloud.sql.connector import Connector
# initialize Connector object
connector = Connector()
# function to return the database connection
def get_google_cloud_sql_conn():
    return connector.connect(
        "project:region:instance",
        "pg8000",
        user="postgres-iam-user@gmail.com",
        db="my-db-name",
        enable_iam_auth=True,
   )
```
Set the environment variable `'SQLALCHEMY_CONNECTION_CREATOR_TOPIC'`, along with
`'PERSISTENCE_MODULE'` and `'SQLALCHEMY_URL'`.
```python
from eventsourcing.utils import get_topic
os.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'
os.environ['SQLALCHEMY_URL'] = 'postgresql+pg8000://'
os.environ['SQLALCHEMY_CONNECTION_CREATOR_TOPIC'] = get_topic(get_google_cloud_sql_conn)
```
## More information
See the library's [documentation](https://eventsourcing.readthedocs.io/)
and the [SQLAlchemy](https://www.sqlalchemy.org/) project for more information.
            
         
        Raw data
        
            {
    "_id": null,
    "home_page": "https://eventsourcing.readthedocs.io/",
    "name": "eventsourcing-sqlalchemy",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.8",
    "maintainer_email": null,
    "keywords": null,
    "author": "John Bywater",
    "author_email": "john.bywater@appropriatesoftware.net",
    "download_url": "https://files.pythonhosted.org/packages/47/81/36c49d2db0d94d79a31eb9aea48a94647385868bfb00dad5216a1c8cc116/eventsourcing_sqlalchemy-0.10.tar.gz",
    "platform": null,
    "description": "# Event Sourcing in Python with SQLAlchemy\n\nThis package supports using the Python\n[eventsourcing](https://github.com/pyeventsourcing/eventsourcing) library\nwith [SQLAlchemy](https://www.sqlalchemy.org/).\n\n## Table of contents\n\n<!-- TOC -->\n* [Table of contents](#table-of-contents)\n* [Quick start](#quick-start)\n* [Installation](#installation)\n* [Getting started](#getting-started)\n* [Managing transactions outside the application](#managing-transactions-outside-the-application)\n* [Using SQLAlchemy scoped sessions](#using-sqlalchemy-scoped-sessions)\n* [Managing sessions with Flask-SQLAlchemy](#managing-sessions-with-flask-sqlalchemy)\n* [Managing sessions with FastAPI-SQLAlchemy](#managing-sessions-with-fastapi-sqlalchemy)\n* [Google Cloud SQL Python Connector](#google-cloud-sql-python-connector)\n* [More information](#more-information)\n<!-- TOC -->\n\n## Quick start\n\nTo use SQLAlchemy with your Python eventsourcing applications:\n* install the Python package `eventsourcing_sqlalchemy`\n* set the environment variable `PERSISTENCE_MODULE` to `'eventsourcing_sqlalchemy'`\n* set the environment variable `SQLALCHEMY_URL` to an SQLAlchemy database URL\n\nSee below for more information.\n\n## Installation\n\nUse pip to install the [stable distribution](https://pypi.org/project/eventsourcing_sqlalchemy/)\nfrom the Python Package Index. Please note, it is recommended to\ninstall Python packages into a Python virtual environment.\n\n    $ pip install eventsourcing_sqlalchemy\n\n## Getting started\n\nDefine aggregates and applications in the usual way.\n\n```python\nfrom eventsourcing.application import Application\nfrom eventsourcing.domain import Aggregate, event\nfrom uuid import uuid5, NAMESPACE_URL\n\n\nclass TrainingSchool(Application):\n    def register(self, name):\n        dog = Dog(name)\n        self.save(dog)\n\n    def add_trick(self, name, trick):\n        dog = self.repository.get(Dog.create_id(name))\n        dog.add_trick(trick)\n        self.save(dog)\n\n    def get_tricks(self, name):\n        dog = self.repository.get(Dog.create_id(name))\n        return dog.tricks\n\n\nclass Dog(Aggregate):\n    @event('Registered')\n    def __init__(self, name):\n        self.name = name\n        self.tricks = []\n\n    @staticmethod\n    def create_id(name):\n        return uuid5(NAMESPACE_URL, f'/dogs/{name}')\n\n    @event('TrickAdded')\n    def add_trick(self, trick):\n        self.tricks.append(trick)\n```\n\nTo use this module as the persistence module for your application, set the environment\nvariable `PERSISTENCE_MODULE` to `'eventsourcing_sqlalchemy'`.\n\nWhen using this module, you need to set the environment variable `SQLALCHEMY_URL` to an\nSQLAlchemy database URL for your database.\nPlease refer to the [SQLAlchemy documentation](https://docs.sqlalchemy.org/en/14/core/engines.html)\nfor more information about SQLAlchemy Database URLs.\n\n```python\nimport os\n\nos.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'\nos.environ['SQLALCHEMY_URL'] = 'sqlite:///:memory:'\n```\n\nConstruct and use the application in the usual way.\n\n```python\nschool = TrainingSchool()\nschool.register('Fido')\nschool.add_trick('Fido', 'roll over')\nschool.add_trick('Fido', 'play dead')\ntricks = school.get_tricks('Fido')\nassert tricks == ['roll over', 'play dead']\n```\n\n## Managing transactions outside the application\n\nSometimes you might need to update an SQLAlchemy ORM model atomically with updates to\nyour event-sourced application. You can manage transactions outside the application.\nJust call the application recorder's `transaction()` method and use the returned\n`Transaction` object as a context manager to obtain an SQLAlchemy `Session`\nobject. You can `add()` your ORM objects to the session. Everything will commit\natomically when the `Transaction` context manager exits. This effectively implements\nthread-scoped transactions.\n\n```python\nwith school.recorder.transaction() as session:\n    # Update CRUD model.\n    ...  # session.add(my_orm_object)\n    # Update event-sourced application.\n    school.register('Buster')\n    school.add_trick('Buster', 'fetch ball')\n\n    tricks = school.get_tricks('Buster')\n    assert tricks == ['fetch ball']\n```\n\nPlease note, the SQLAlchemy \"autoflush\" ORM feature is enabled by default.\n\n```python\napp = Application()\n\nwith app.recorder.transaction() as session:\n    assert session.autoflush is True\n```\n\nIf you need \"autoflush\" to be disabled, you can set the environment variable `SQLALCHEMY_NO_AUTOFLUSH`.\n\n```python\napp = Application(env={'SQLALCHEMY_AUTOFLUSH': 'False'})\n\nwith app.recorder.transaction() as session:\n    assert session.autoflush is False\n```\n\nAlternatively, you can set the autoflush option directly on the SQLAlchemy session maker.\n\n```python\napp = Application()\napp.recorder.datastore.session_maker.kw[\"autoflush\"] = False\n\nwith app.recorder.transaction() as session:\n    assert session.autoflush is False\n```\n\nAlternatively, you can use the session's ``no_autoflush`` context manager.\n\n```python\napp = Application()\n\nwith app.recorder.transaction() as session:\n    with session.no_autoflush:\n        assert session.autoflush is False\n```\n\nAlternatively, you can set the ``autoflush`` attribute of the session object.\n\n```python\napp = Application()\n\nwith app.recorder.transaction() as session:\n    session.autoflush = False\n    # Add CRUD objects to the session.\n    ...\n```\n\n## Using SQLAlchemy scoped sessions\n\nIt's possible to configure the application to use an SQLAlchemy `scoped_session`\nobject which will scope the session to standard threads, or other things such\nas Web requests in a Web application framework.\n\nDefine an adapter for a `scoped_session` object and configure the event-sourced\napplication using the environment variable `SQLALCHEMY_SCOPED_SESSION_TOPIC`.\n\n```python\nfrom eventsourcing.application import AggregateNotFound\nfrom eventsourcing.utils import get_topic\nfrom sqlalchemy import create_engine\nfrom sqlalchemy.orm import sessionmaker, scoped_session\n\n# Create engine.\nengine = create_engine('sqlite:///:memory:')\n\n# Create a scoped_session object.\nsession = scoped_session(\n    sessionmaker(autocommit=False, autoflush=False, bind=engine)\n)\n\n# Define an adapter for the scoped session.\nclass MyScopedSessionAdapter:\n    def __getattribute__(self, item: str) -> None:\n        return getattr(session, item)\n\n# Produce the topic of the scoped session adapter class.\nscoped_session_topic = get_topic(MyScopedSessionAdapter)\n\n# Construct an event-sourced application.\napp = Application(\n    env={'SQLALCHEMY_SCOPED_SESSION_TOPIC': scoped_session_topic}\n)\n\n# During request.\naggregate = Aggregate()\napp.save(aggregate)\napp.repository.get(aggregate.id)\nsession.commit()\n\n# After request.\nsession.remove()\n\n# During request.\napp.repository.get(aggregate.id)\n\n# After request.\nsession.remove()\n\n# During request.\naggregate = Aggregate()\napp.save(aggregate)\n# forget to commit\n\n# After request.\nsession.remove()\n\n# During request.\ntry:\n    # forgot to commit\n    app.repository.get(aggregate.id)\nexcept AggregateNotFound:\n    pass\nelse:\n    raise Exception(\"Expected aggregate not found\")\n\n# After request.\nsession.remove()\n```\n\nAs you can see, you need to call `commit()` during a request, and call `remove()`\nafter the request completes. Packages that integrate SQLAlchemy with Web application\nframeworks tend to automate this call to `remove()`. Some of them also call `commit()`\nautomatically if an exception is not raised during the handling of a request.\n\n## Managing sessions with Flask-SQLAlchemy\n\nThe package [Flask-SQLAlchemy](https://github.com/pallets-eco/flask-sqlalchemy)\n([full docs](https://flask-sqlalchemy.palletsprojects.com/)) provides a class\ncalled `SQLAlchemy` which has a `session` attribute which is an SQLAlchemy\n`scoped_session`. This can be adapted in a similar way.\n\n```python\nfrom flask import Flask\nfrom flask_sqlalchemy import SQLAlchemy\ntry:\n    from sqlalchemy.orm import declarative_base  # type: ignore\nexcept ImportError:\n    from sqlalchemy.ext.declarative import declarative_base\n\n# Define a Flask app.\nflask_app = Flask(__name__)\nflask_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'\n\n# Integration between Flask and SQLAlchemy.\nBase = declarative_base()\ndb = SQLAlchemy(flask_app, model_class=Base)\n\n\n# Define an adapter for the scoped session.\nclass FlaskScopedSession:\n    def __getattribute__(self, item: str) -> None:\n        return getattr(db.session, item)\n\n\n# Run the Flask application in a Web application server.\nwith flask_app.app_context():\n\n    # Produce the topic of the scoped session adapter class.\n    scoped_session_adapter_topic = get_topic(FlaskScopedSession)\n    # Construct event-sourced application to use scoped sessions.\n    es_app = Application(\n        env={\"SQLALCHEMY_SCOPED_SESSION_TOPIC\": scoped_session_adapter_topic}\n    )\n\n    # During request.\n    aggregate = Aggregate()\n    es_app.save(aggregate)\n    db.session.commit()\n\n    # After request (this is done automatically).\n    db.session.remove()\n\n    # During request.\n    es_app.repository.get(aggregate.id)\n\n    # After request (this is done automatically).\n    db.session.remove()\n```\n\n## Managing sessions with FastAPI-SQLAlchemy\n\nThe package [FastAPI-SQLAlchemy](https://github.com/mfreeborn/fastapi-sqlalchemy)\ndoesn't actually use an SQLAlchemy `scoped_session`, but instead has a global `db`\nvariable that has a `session` attribute which returns request-scoped sessions when\naccessed. This can be adapted in a similar way. Sessions are committed automatically\nafter the request has been handled successfully, and not committed if an exception\nis raised.\n\n```python\nfrom fastapi import FastAPI\nfrom fastapi_sqlalchemy import db, DBSessionMiddleware\n\n# Define a FastAPI application.\nfastapi_app = FastAPI()\n\n# Add SQLAlchemy integration middleware to the FastAPI application.\nfastapi_app.add_middleware(\n    DBSessionMiddleware, db_url='sqlite:///:memory:'\n)\n\n# Build the middleware stack (happens automatically when the FastAPI app runs in a Web app server).\nfastapi_app.build_middleware_stack()\n\n# Define an adapter for the scoped session.\nclass FastapiScopedSession:\n    def __getattribute__(self, item: str) -> None:\n        return getattr(db.session, item)\n\n# Construct an event-sourced application within a scoped session.\nwith db(commit_on_exit=True):\n    # Produce the topic of the scoped session adapter class.\n    scoped_session_adapter_topic = get_topic(FlaskScopedSession)\n    # Construct event-sourced application to use scoped sessions.\n    es_app = Application(\n        env={\"SQLALCHEMY_SCOPED_SESSION_TOPIC\": get_topic(FastapiScopedSession)}\n    )\n\n# Create a new event-sourced aggregate.\nwith db(commit_on_exit=True):  # This happens automatically before handling a route.\n    # Handle request.\n    aggregate = Aggregate()\n    es_app.save(aggregate)\n    es_app.repository.get(aggregate.id)\n\n# The aggregate has been committed.\nwith db(commit_on_exit=True):  # This happens automatically before handling a route.\n    # Handle request.\n    es_app.repository.get(aggregate.id)\n\n# Raise exception after creating aggregate.\ntry:\n    with db(commit_on_exit=True):\n        # Handle request.\n        aggregate = Aggregate()\n        es_app.save(aggregate)\n        es_app.repository.get(aggregate.id)\n        raise TypeError(\"An error occurred!!!\")\nexcept TypeError:\n    # Web framework returns an error.\n    ...\nelse:\n    raise Exception(\"Expected type error\")\n\n# The aggregate hasn't been committed.\nwith db(commit_on_exit=True):\n    try:\n        es_app.repository.get(aggregate.id)\n    except AggregateNotFound:\n        pass\n    else:\n        raise Exception(\"Expected aggregate not found\")\n```\n\n\n\n## Google Cloud SQL Python Connector\n\nYou can set the environment variable `SQLALCHEMY_CONNECTION_CREATOR_TOPIC` to a topic\nthat will resolve to a callable that will be used to create database connections.\n\nFor example, you can use the [Cloud SQL Python Connector](https://pypi.org/project/cloud-sql-python-connector/)\nin the following way.\n\nFirst install the Cloud SQL Python Connector package from PyPI.\n\n    $ pip install 'cloud-sql-python-connector[pg8000]'\n\nThen define a `getconn()` function, following the advice in the Cloud SQL\nPython Connector README page.\n\n```python\nfrom google.cloud.sql.connector import Connector\n\n# initialize Connector object\nconnector = Connector()\n\n# function to return the database connection\ndef get_google_cloud_sql_conn():\n    return connector.connect(\n        \"project:region:instance\",\n        \"pg8000\",\n        user=\"postgres-iam-user@gmail.com\",\n        db=\"my-db-name\",\n        enable_iam_auth=True,\n   )\n```\n\nSet the environment variable `'SQLALCHEMY_CONNECTION_CREATOR_TOPIC'`, along with\n`'PERSISTENCE_MODULE'` and `'SQLALCHEMY_URL'`.\n\n```python\nfrom eventsourcing.utils import get_topic\n\nos.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'\nos.environ['SQLALCHEMY_URL'] = 'postgresql+pg8000://'\nos.environ['SQLALCHEMY_CONNECTION_CREATOR_TOPIC'] = get_topic(get_google_cloud_sql_conn)\n```\n\n## More information\n\nSee the library's [documentation](https://eventsourcing.readthedocs.io/)\nand the [SQLAlchemy](https://www.sqlalchemy.org/) project for more information.\n",
    "bugtrack_url": null,
    "license": "BSD 3-Clause",
    "summary": "Python package for eventsourcing with SQLAlchemy.",
    "version": "0.10",
    "project_urls": {
        "Homepage": "https://eventsourcing.readthedocs.io/",
        "Repository": "https://github.com/pyeventsourcing/eventsourcing-sqlalchemy"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "db143ce5ffed52f8836f781d8a10cd20c06f849903e79716e2a475521884de2c",
                "md5": "65901cbf2d9e8336538db152503fbc55",
                "sha256": "460121469a74f8825de4d49c893b57e56a7ac5c189568d3e1f2fe90b84078f3c"
            },
            "downloads": -1,
            "filename": "eventsourcing_sqlalchemy-0.10-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "65901cbf2d9e8336538db152503fbc55",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.8",
            "size": 13108,
            "upload_time": "2024-11-08T15:34:37",
            "upload_time_iso_8601": "2024-11-08T15:34:37.205288Z",
            "url": "https://files.pythonhosted.org/packages/db/14/3ce5ffed52f8836f781d8a10cd20c06f849903e79716e2a475521884de2c/eventsourcing_sqlalchemy-0.10-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "478136c49d2db0d94d79a31eb9aea48a94647385868bfb00dad5216a1c8cc116",
                "md5": "b1cce7be5f57cfea3bb41d4def710bf5",
                "sha256": "63ad18d8184f3732010bef8ee556e6bfa6b18ee5758687b86f32cf35807c2ea5"
            },
            "downloads": -1,
            "filename": "eventsourcing_sqlalchemy-0.10.tar.gz",
            "has_sig": false,
            "md5_digest": "b1cce7be5f57cfea3bb41d4def710bf5",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.8",
            "size": 15145,
            "upload_time": "2024-11-08T15:34:39",
            "upload_time_iso_8601": "2024-11-08T15:34:39.474612Z",
            "url": "https://files.pythonhosted.org/packages/47/81/36c49d2db0d94d79a31eb9aea48a94647385868bfb00dad5216a1c8cc116/eventsourcing_sqlalchemy-0.10.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-11-08 15:34:39",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "pyeventsourcing",
    "github_project": "eventsourcing-sqlalchemy",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "eventsourcing-sqlalchemy"
}