# Event Sourcing in Python with SQLAlchemy
This package supports using the Python
[eventsourcing](https://github.com/pyeventsourcing/eventsourcing) library
with [SQLAlchemy](https://www.sqlalchemy.org/).
## Table of contents
<!-- TOC -->
* [Table of contents](#table-of-contents)
* [Quick start](#quick-start)
* [Installation](#installation)
* [Getting started](#getting-started)
* [Managing transactions outside the application](#managing-transactions-outside-the-application)
* [Using SQLAlchemy scoped sessions](#using-sqlalchemy-scoped-sessions)
* [Managing sessions with Flask-SQLAlchemy](#managing-sessions-with-flask-sqlalchemy)
* [Managing sessions with FastAPI-SQLAlchemy](#managing-sessions-with-fastapi-sqlalchemy)
* [Google Cloud SQL Python Connector](#google-cloud-sql-python-connector)
* [More information](#more-information)
<!-- TOC -->
## Quick start
To use SQLAlchemy with your Python eventsourcing applications:
* install the Python package `eventsourcing_sqlalchemy`
* set the environment variable `PERSISTENCE_MODULE` to `'eventsourcing_sqlalchemy'`
* set the environment variable `SQLALCHEMY_URL` to an SQLAlchemy database URL
See below for more information.
## Installation
Use pip to install the [stable distribution](https://pypi.org/project/eventsourcing_sqlalchemy/)
from the Python Package Index. Please note, it is recommended to
install Python packages into a Python virtual environment.
$ pip install eventsourcing_sqlalchemy
## Getting started
Define aggregates and applications in the usual way.
```python
from eventsourcing.application import Application
from eventsourcing.domain import Aggregate, event
from uuid import uuid5, NAMESPACE_URL
class TrainingSchool(Application):
def register(self, name):
dog = Dog(name)
self.save(dog)
def add_trick(self, name, trick):
dog = self.repository.get(Dog.create_id(name))
dog.add_trick(trick)
self.save(dog)
def get_tricks(self, name):
dog = self.repository.get(Dog.create_id(name))
return dog.tricks
class Dog(Aggregate):
@event('Registered')
def __init__(self, name):
self.name = name
self.tricks = []
@staticmethod
def create_id(name):
return uuid5(NAMESPACE_URL, f'/dogs/{name}')
@event('TrickAdded')
def add_trick(self, trick):
self.tricks.append(trick)
```
To use this module as the persistence module for your application, set the environment
variable `PERSISTENCE_MODULE` to `'eventsourcing_sqlalchemy'`.
When using this module, you need to set the environment variable `SQLALCHEMY_URL` to an
SQLAlchemy database URL for your database.
Please refer to the [SQLAlchemy documentation](https://docs.sqlalchemy.org/en/14/core/engines.html)
for more information about SQLAlchemy Database URLs.
```python
import os
os.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'
os.environ['SQLALCHEMY_URL'] = 'sqlite:///:memory:'
```
Construct and use the application in the usual way.
```python
school = TrainingSchool()
school.register('Fido')
school.add_trick('Fido', 'roll over')
school.add_trick('Fido', 'play dead')
tricks = school.get_tricks('Fido')
assert tricks == ['roll over', 'play dead']
```
## Managing transactions outside the application
Sometimes you might need to update an SQLAlchemy ORM model atomically with updates to
your event-sourced application. You can manage transactions outside the application.
Just call the application recorder's `transaction()` method and use the returned
`Transaction` object as a context manager to obtain an SQLAlchemy `Session`
object. You can `add()` your ORM objects to the session. Everything will commit
atomically when the `Transaction` context manager exits. This effectively implements
thread-scoped transactions.
```python
with school.recorder.transaction() as session:
# Update CRUD model.
... # session.add(my_orm_object)
# Update event-sourced application.
school.register('Buster')
school.add_trick('Buster', 'fetch ball')
tricks = school.get_tricks('Buster')
assert tricks == ['fetch ball']
```
Please note, the SQLAlchemy "autoflush" ORM feature is enabled by default.
```python
app = Application()
with app.recorder.transaction() as session:
assert session.autoflush is True
```
If you need "autoflush" to be disabled, you can set the environment variable `SQLALCHEMY_NO_AUTOFLUSH`.
```python
app = Application(env={'SQLALCHEMY_AUTOFLUSH': 'False'})
with app.recorder.transaction() as session:
assert session.autoflush is False
```
Alternatively, you can set the autoflush option directly on the SQLAlchemy session maker.
```python
app = Application()
app.recorder.datastore.session_maker.kw["autoflush"] = False
with app.recorder.transaction() as session:
assert session.autoflush is False
```
Alternatively, you can use the session's ``no_autoflush`` context manager.
```python
app = Application()
with app.recorder.transaction() as session:
with session.no_autoflush:
assert session.autoflush is False
```
Alternatively, you can set the ``autoflush`` attribute of the session object.
```python
app = Application()
with app.recorder.transaction() as session:
session.autoflush = False
# Add CRUD objects to the session.
...
```
## Using SQLAlchemy scoped sessions
It's possible to configure the application to use an SQLAlchemy `scoped_session`
object which will scope the session to standard threads, or other things such
as Web requests in a Web application framework.
Define an adapter for a `scoped_session` object and configure the event-sourced
application using the environment variable `SQLALCHEMY_SCOPED_SESSION_TOPIC`.
```python
from eventsourcing.application import AggregateNotFound
from eventsourcing.utils import get_topic
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
# Create engine.
engine = create_engine('sqlite:///:memory:')
# Create a scoped_session object.
session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=engine)
)
# Define an adapter for the scoped session.
class MyScopedSessionAdapter:
def __getattribute__(self, item: str) -> None:
return getattr(session, item)
# Produce the topic of the scoped session adapter class.
scoped_session_topic = get_topic(MyScopedSessionAdapter)
# Construct an event-sourced application.
app = Application(
env={'SQLALCHEMY_SCOPED_SESSION_TOPIC': scoped_session_topic}
)
# During request.
aggregate = Aggregate()
app.save(aggregate)
app.repository.get(aggregate.id)
session.commit()
# After request.
session.remove()
# During request.
app.repository.get(aggregate.id)
# After request.
session.remove()
# During request.
aggregate = Aggregate()
app.save(aggregate)
# forget to commit
# After request.
session.remove()
# During request.
try:
# forgot to commit
app.repository.get(aggregate.id)
except AggregateNotFound:
pass
else:
raise Exception("Expected aggregate not found")
# After request.
session.remove()
```
As you can see, you need to call `commit()` during a request, and call `remove()`
after the request completes. Packages that integrate SQLAlchemy with Web application
frameworks tend to automate this call to `remove()`. Some of them also call `commit()`
automatically if an exception is not raised during the handling of a request.
## Managing sessions with Flask-SQLAlchemy
The package [Flask-SQLAlchemy](https://github.com/pallets-eco/flask-sqlalchemy)
([full docs](https://flask-sqlalchemy.palletsprojects.com/)) provides a class
called `SQLAlchemy` which has a `session` attribute which is an SQLAlchemy
`scoped_session`. This can be adapted in a similar way.
```python
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
try:
from sqlalchemy.orm import declarative_base # type: ignore
except ImportError:
from sqlalchemy.ext.declarative import declarative_base
# Define a Flask app.
flask_app = Flask(__name__)
flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
# Integration between Flask and SQLAlchemy.
Base = declarative_base()
db = SQLAlchemy(flask_app, model_class=Base)
# Define an adapter for the scoped session.
class FlaskScopedSession:
def __getattribute__(self, item: str) -> None:
return getattr(db.session, item)
# Run the Flask application in a Web application server.
with flask_app.app_context():
# Produce the topic of the scoped session adapter class.
scoped_session_adapter_topic = get_topic(FlaskScopedSession)
# Construct event-sourced application to use scoped sessions.
es_app = Application(
env={"SQLALCHEMY_SCOPED_SESSION_TOPIC": scoped_session_adapter_topic}
)
# During request.
aggregate = Aggregate()
es_app.save(aggregate)
db.session.commit()
# After request (this is done automatically).
db.session.remove()
# During request.
es_app.repository.get(aggregate.id)
# After request (this is done automatically).
db.session.remove()
```
## Managing sessions with FastAPI-SQLAlchemy
The package [FastAPI-SQLAlchemy](https://github.com/mfreeborn/fastapi-sqlalchemy)
doesn't actually use an SQLAlchemy `scoped_session`, but instead has a global `db`
variable that has a `session` attribute which returns request-scoped sessions when
accessed. This can be adapted in a similar way. Sessions are committed automatically
after the request has been handled successfully, and not committed if an exception
is raised.
```python
from fastapi import FastAPI
from fastapi_sqlalchemy import db, DBSessionMiddleware
# Define a FastAPI application.
fastapi_app = FastAPI()
# Add SQLAlchemy integration middleware to the FastAPI application.
fastapi_app.add_middleware(
DBSessionMiddleware, db_url='sqlite:///:memory:'
)
# Build the middleware stack (happens automatically when the FastAPI app runs in a Web app server).
fastapi_app.build_middleware_stack()
# Define an adapter for the scoped session.
class FastapiScopedSession:
def __getattribute__(self, item: str) -> None:
return getattr(db.session, item)
# Construct an event-sourced application within a scoped session.
with db(commit_on_exit=True):
# Produce the topic of the scoped session adapter class.
scoped_session_adapter_topic = get_topic(FlaskScopedSession)
# Construct event-sourced application to use scoped sessions.
es_app = Application(
env={"SQLALCHEMY_SCOPED_SESSION_TOPIC": get_topic(FastapiScopedSession)}
)
# Create a new event-sourced aggregate.
with db(commit_on_exit=True): # This happens automatically before handling a route.
# Handle request.
aggregate = Aggregate()
es_app.save(aggregate)
es_app.repository.get(aggregate.id)
# The aggregate has been committed.
with db(commit_on_exit=True): # This happens automatically before handling a route.
# Handle request.
es_app.repository.get(aggregate.id)
# Raise exception after creating aggregate.
try:
with db(commit_on_exit=True):
# Handle request.
aggregate = Aggregate()
es_app.save(aggregate)
es_app.repository.get(aggregate.id)
raise TypeError("An error occurred!!!")
except TypeError:
# Web framework returns an error.
...
else:
raise Exception("Expected type error")
# The aggregate hasn't been committed.
with db(commit_on_exit=True):
try:
es_app.repository.get(aggregate.id)
except AggregateNotFound:
pass
else:
raise Exception("Expected aggregate not found")
```
## Google Cloud SQL Python Connector
You can set the environment variable `SQLALCHEMY_CONNECTION_CREATOR_TOPIC` to a topic
that will resolve to a callable that will be used to create database connections.
For example, you can use the [Cloud SQL Python Connector](https://pypi.org/project/cloud-sql-python-connector/)
in the following way.
First install the Cloud SQL Python Connector package from PyPI.
$ pip install 'cloud-sql-python-connector[pg8000]'
Then define a `getconn()` function, following the advice in the Cloud SQL
Python Connector README page.
```python
from google.cloud.sql.connector import Connector
# initialize Connector object
connector = Connector()
# function to return the database connection
def get_google_cloud_sql_conn():
return connector.connect(
"project:region:instance",
"pg8000",
user="postgres-iam-user@gmail.com",
db="my-db-name",
enable_iam_auth=True,
)
```
Set the environment variable `'SQLALCHEMY_CONNECTION_CREATOR_TOPIC'`, along with
`'PERSISTENCE_MODULE'` and `'SQLALCHEMY_URL'`.
```python
from eventsourcing.utils import get_topic
os.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'
os.environ['SQLALCHEMY_URL'] = 'postgresql+pg8000://'
os.environ['SQLALCHEMY_CONNECTION_CREATOR_TOPIC'] = get_topic(get_google_cloud_sql_conn)
```
## More information
See the library's [documentation](https://eventsourcing.readthedocs.io/)
and the [SQLAlchemy](https://www.sqlalchemy.org/) project for more information.
Raw data
{
"_id": null,
"home_page": "https://eventsourcing.readthedocs.io/",
"name": "eventsourcing-sqlalchemy",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.8",
"maintainer_email": null,
"keywords": null,
"author": "John Bywater",
"author_email": "john.bywater@appropriatesoftware.net",
"download_url": "https://files.pythonhosted.org/packages/47/81/36c49d2db0d94d79a31eb9aea48a94647385868bfb00dad5216a1c8cc116/eventsourcing_sqlalchemy-0.10.tar.gz",
"platform": null,
"description": "# Event Sourcing in Python with SQLAlchemy\n\nThis package supports using the Python\n[eventsourcing](https://github.com/pyeventsourcing/eventsourcing) library\nwith [SQLAlchemy](https://www.sqlalchemy.org/).\n\n## Table of contents\n\n<!-- TOC -->\n* [Table of contents](#table-of-contents)\n* [Quick start](#quick-start)\n* [Installation](#installation)\n* [Getting started](#getting-started)\n* [Managing transactions outside the application](#managing-transactions-outside-the-application)\n* [Using SQLAlchemy scoped sessions](#using-sqlalchemy-scoped-sessions)\n* [Managing sessions with Flask-SQLAlchemy](#managing-sessions-with-flask-sqlalchemy)\n* [Managing sessions with FastAPI-SQLAlchemy](#managing-sessions-with-fastapi-sqlalchemy)\n* [Google Cloud SQL Python Connector](#google-cloud-sql-python-connector)\n* [More information](#more-information)\n<!-- TOC -->\n\n## Quick start\n\nTo use SQLAlchemy with your Python eventsourcing applications:\n* install the Python package `eventsourcing_sqlalchemy`\n* set the environment variable `PERSISTENCE_MODULE` to `'eventsourcing_sqlalchemy'`\n* set the environment variable `SQLALCHEMY_URL` to an SQLAlchemy database URL\n\nSee below for more information.\n\n## Installation\n\nUse pip to install the [stable distribution](https://pypi.org/project/eventsourcing_sqlalchemy/)\nfrom the Python Package Index. Please note, it is recommended to\ninstall Python packages into a Python virtual environment.\n\n $ pip install eventsourcing_sqlalchemy\n\n## Getting started\n\nDefine aggregates and applications in the usual way.\n\n```python\nfrom eventsourcing.application import Application\nfrom eventsourcing.domain import Aggregate, event\nfrom uuid import uuid5, NAMESPACE_URL\n\n\nclass TrainingSchool(Application):\n def register(self, name):\n dog = Dog(name)\n self.save(dog)\n\n def add_trick(self, name, trick):\n dog = self.repository.get(Dog.create_id(name))\n dog.add_trick(trick)\n self.save(dog)\n\n def get_tricks(self, name):\n dog = self.repository.get(Dog.create_id(name))\n return dog.tricks\n\n\nclass Dog(Aggregate):\n @event('Registered')\n def __init__(self, name):\n self.name = name\n self.tricks = []\n\n @staticmethod\n def create_id(name):\n return uuid5(NAMESPACE_URL, f'/dogs/{name}')\n\n @event('TrickAdded')\n def add_trick(self, trick):\n self.tricks.append(trick)\n```\n\nTo use this module as the persistence module for your application, set the environment\nvariable `PERSISTENCE_MODULE` to `'eventsourcing_sqlalchemy'`.\n\nWhen using this module, you need to set the environment variable `SQLALCHEMY_URL` to an\nSQLAlchemy database URL for your database.\nPlease refer to the [SQLAlchemy documentation](https://docs.sqlalchemy.org/en/14/core/engines.html)\nfor more information about SQLAlchemy Database URLs.\n\n```python\nimport os\n\nos.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'\nos.environ['SQLALCHEMY_URL'] = 'sqlite:///:memory:'\n```\n\nConstruct and use the application in the usual way.\n\n```python\nschool = TrainingSchool()\nschool.register('Fido')\nschool.add_trick('Fido', 'roll over')\nschool.add_trick('Fido', 'play dead')\ntricks = school.get_tricks('Fido')\nassert tricks == ['roll over', 'play dead']\n```\n\n## Managing transactions outside the application\n\nSometimes you might need to update an SQLAlchemy ORM model atomically with updates to\nyour event-sourced application. You can manage transactions outside the application.\nJust call the application recorder's `transaction()` method and use the returned\n`Transaction` object as a context manager to obtain an SQLAlchemy `Session`\nobject. You can `add()` your ORM objects to the session. Everything will commit\natomically when the `Transaction` context manager exits. This effectively implements\nthread-scoped transactions.\n\n```python\nwith school.recorder.transaction() as session:\n # Update CRUD model.\n ... # session.add(my_orm_object)\n # Update event-sourced application.\n school.register('Buster')\n school.add_trick('Buster', 'fetch ball')\n\n tricks = school.get_tricks('Buster')\n assert tricks == ['fetch ball']\n```\n\nPlease note, the SQLAlchemy \"autoflush\" ORM feature is enabled by default.\n\n```python\napp = Application()\n\nwith app.recorder.transaction() as session:\n assert session.autoflush is True\n```\n\nIf you need \"autoflush\" to be disabled, you can set the environment variable `SQLALCHEMY_NO_AUTOFLUSH`.\n\n```python\napp = Application(env={'SQLALCHEMY_AUTOFLUSH': 'False'})\n\nwith app.recorder.transaction() as session:\n assert session.autoflush is False\n```\n\nAlternatively, you can set the autoflush option directly on the SQLAlchemy session maker.\n\n```python\napp = Application()\napp.recorder.datastore.session_maker.kw[\"autoflush\"] = False\n\nwith app.recorder.transaction() as session:\n assert session.autoflush is False\n```\n\nAlternatively, you can use the session's ``no_autoflush`` context manager.\n\n```python\napp = Application()\n\nwith app.recorder.transaction() as session:\n with session.no_autoflush:\n assert session.autoflush is False\n```\n\nAlternatively, you can set the ``autoflush`` attribute of the session object.\n\n```python\napp = Application()\n\nwith app.recorder.transaction() as session:\n session.autoflush = False\n # Add CRUD objects to the session.\n ...\n```\n\n## Using SQLAlchemy scoped sessions\n\nIt's possible to configure the application to use an SQLAlchemy `scoped_session`\nobject which will scope the session to standard threads, or other things such\nas Web requests in a Web application framework.\n\nDefine an adapter for a `scoped_session` object and configure the event-sourced\napplication using the environment variable `SQLALCHEMY_SCOPED_SESSION_TOPIC`.\n\n```python\nfrom eventsourcing.application import AggregateNotFound\nfrom eventsourcing.utils import get_topic\nfrom sqlalchemy import create_engine\nfrom sqlalchemy.orm import sessionmaker, scoped_session\n\n# Create engine.\nengine = create_engine('sqlite:///:memory:')\n\n# Create a scoped_session object.\nsession = scoped_session(\n sessionmaker(autocommit=False, autoflush=False, bind=engine)\n)\n\n# Define an adapter for the scoped session.\nclass MyScopedSessionAdapter:\n def __getattribute__(self, item: str) -> None:\n return getattr(session, item)\n\n# Produce the topic of the scoped session adapter class.\nscoped_session_topic = get_topic(MyScopedSessionAdapter)\n\n# Construct an event-sourced application.\napp = Application(\n env={'SQLALCHEMY_SCOPED_SESSION_TOPIC': scoped_session_topic}\n)\n\n# During request.\naggregate = Aggregate()\napp.save(aggregate)\napp.repository.get(aggregate.id)\nsession.commit()\n\n# After request.\nsession.remove()\n\n# During request.\napp.repository.get(aggregate.id)\n\n# After request.\nsession.remove()\n\n# During request.\naggregate = Aggregate()\napp.save(aggregate)\n# forget to commit\n\n# After request.\nsession.remove()\n\n# During request.\ntry:\n # forgot to commit\n app.repository.get(aggregate.id)\nexcept AggregateNotFound:\n pass\nelse:\n raise Exception(\"Expected aggregate not found\")\n\n# After request.\nsession.remove()\n```\n\nAs you can see, you need to call `commit()` during a request, and call `remove()`\nafter the request completes. Packages that integrate SQLAlchemy with Web application\nframeworks tend to automate this call to `remove()`. Some of them also call `commit()`\nautomatically if an exception is not raised during the handling of a request.\n\n## Managing sessions with Flask-SQLAlchemy\n\nThe package [Flask-SQLAlchemy](https://github.com/pallets-eco/flask-sqlalchemy)\n([full docs](https://flask-sqlalchemy.palletsprojects.com/)) provides a class\ncalled `SQLAlchemy` which has a `session` attribute which is an SQLAlchemy\n`scoped_session`. This can be adapted in a similar way.\n\n```python\nfrom flask import Flask\nfrom flask_sqlalchemy import SQLAlchemy\ntry:\n from sqlalchemy.orm import declarative_base # type: ignore\nexcept ImportError:\n from sqlalchemy.ext.declarative import declarative_base\n\n# Define a Flask app.\nflask_app = Flask(__name__)\nflask_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'\n\n# Integration between Flask and SQLAlchemy.\nBase = declarative_base()\ndb = SQLAlchemy(flask_app, model_class=Base)\n\n\n# Define an adapter for the scoped session.\nclass FlaskScopedSession:\n def __getattribute__(self, item: str) -> None:\n return getattr(db.session, item)\n\n\n# Run the Flask application in a Web application server.\nwith flask_app.app_context():\n\n # Produce the topic of the scoped session adapter class.\n scoped_session_adapter_topic = get_topic(FlaskScopedSession)\n # Construct event-sourced application to use scoped sessions.\n es_app = Application(\n env={\"SQLALCHEMY_SCOPED_SESSION_TOPIC\": scoped_session_adapter_topic}\n )\n\n # During request.\n aggregate = Aggregate()\n es_app.save(aggregate)\n db.session.commit()\n\n # After request (this is done automatically).\n db.session.remove()\n\n # During request.\n es_app.repository.get(aggregate.id)\n\n # After request (this is done automatically).\n db.session.remove()\n```\n\n## Managing sessions with FastAPI-SQLAlchemy\n\nThe package [FastAPI-SQLAlchemy](https://github.com/mfreeborn/fastapi-sqlalchemy)\ndoesn't actually use an SQLAlchemy `scoped_session`, but instead has a global `db`\nvariable that has a `session` attribute which returns request-scoped sessions when\naccessed. This can be adapted in a similar way. Sessions are committed automatically\nafter the request has been handled successfully, and not committed if an exception\nis raised.\n\n```python\nfrom fastapi import FastAPI\nfrom fastapi_sqlalchemy import db, DBSessionMiddleware\n\n# Define a FastAPI application.\nfastapi_app = FastAPI()\n\n# Add SQLAlchemy integration middleware to the FastAPI application.\nfastapi_app.add_middleware(\n DBSessionMiddleware, db_url='sqlite:///:memory:'\n)\n\n# Build the middleware stack (happens automatically when the FastAPI app runs in a Web app server).\nfastapi_app.build_middleware_stack()\n\n# Define an adapter for the scoped session.\nclass FastapiScopedSession:\n def __getattribute__(self, item: str) -> None:\n return getattr(db.session, item)\n\n# Construct an event-sourced application within a scoped session.\nwith db(commit_on_exit=True):\n # Produce the topic of the scoped session adapter class.\n scoped_session_adapter_topic = get_topic(FlaskScopedSession)\n # Construct event-sourced application to use scoped sessions.\n es_app = Application(\n env={\"SQLALCHEMY_SCOPED_SESSION_TOPIC\": get_topic(FastapiScopedSession)}\n )\n\n# Create a new event-sourced aggregate.\nwith db(commit_on_exit=True): # This happens automatically before handling a route.\n # Handle request.\n aggregate = Aggregate()\n es_app.save(aggregate)\n es_app.repository.get(aggregate.id)\n\n# The aggregate has been committed.\nwith db(commit_on_exit=True): # This happens automatically before handling a route.\n # Handle request.\n es_app.repository.get(aggregate.id)\n\n# Raise exception after creating aggregate.\ntry:\n with db(commit_on_exit=True):\n # Handle request.\n aggregate = Aggregate()\n es_app.save(aggregate)\n es_app.repository.get(aggregate.id)\n raise TypeError(\"An error occurred!!!\")\nexcept TypeError:\n # Web framework returns an error.\n ...\nelse:\n raise Exception(\"Expected type error\")\n\n# The aggregate hasn't been committed.\nwith db(commit_on_exit=True):\n try:\n es_app.repository.get(aggregate.id)\n except AggregateNotFound:\n pass\n else:\n raise Exception(\"Expected aggregate not found\")\n```\n\n\n\n## Google Cloud SQL Python Connector\n\nYou can set the environment variable `SQLALCHEMY_CONNECTION_CREATOR_TOPIC` to a topic\nthat will resolve to a callable that will be used to create database connections.\n\nFor example, you can use the [Cloud SQL Python Connector](https://pypi.org/project/cloud-sql-python-connector/)\nin the following way.\n\nFirst install the Cloud SQL Python Connector package from PyPI.\n\n $ pip install 'cloud-sql-python-connector[pg8000]'\n\nThen define a `getconn()` function, following the advice in the Cloud SQL\nPython Connector README page.\n\n```python\nfrom google.cloud.sql.connector import Connector\n\n# initialize Connector object\nconnector = Connector()\n\n# function to return the database connection\ndef get_google_cloud_sql_conn():\n return connector.connect(\n \"project:region:instance\",\n \"pg8000\",\n user=\"postgres-iam-user@gmail.com\",\n db=\"my-db-name\",\n enable_iam_auth=True,\n )\n```\n\nSet the environment variable `'SQLALCHEMY_CONNECTION_CREATOR_TOPIC'`, along with\n`'PERSISTENCE_MODULE'` and `'SQLALCHEMY_URL'`.\n\n```python\nfrom eventsourcing.utils import get_topic\n\nos.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'\nos.environ['SQLALCHEMY_URL'] = 'postgresql+pg8000://'\nos.environ['SQLALCHEMY_CONNECTION_CREATOR_TOPIC'] = get_topic(get_google_cloud_sql_conn)\n```\n\n## More information\n\nSee the library's [documentation](https://eventsourcing.readthedocs.io/)\nand the [SQLAlchemy](https://www.sqlalchemy.org/) project for more information.\n",
"bugtrack_url": null,
"license": "BSD 3-Clause",
"summary": "Python package for eventsourcing with SQLAlchemy.",
"version": "0.10",
"project_urls": {
"Homepage": "https://eventsourcing.readthedocs.io/",
"Repository": "https://github.com/pyeventsourcing/eventsourcing-sqlalchemy"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "db143ce5ffed52f8836f781d8a10cd20c06f849903e79716e2a475521884de2c",
"md5": "65901cbf2d9e8336538db152503fbc55",
"sha256": "460121469a74f8825de4d49c893b57e56a7ac5c189568d3e1f2fe90b84078f3c"
},
"downloads": -1,
"filename": "eventsourcing_sqlalchemy-0.10-py3-none-any.whl",
"has_sig": false,
"md5_digest": "65901cbf2d9e8336538db152503fbc55",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.8",
"size": 13108,
"upload_time": "2024-11-08T15:34:37",
"upload_time_iso_8601": "2024-11-08T15:34:37.205288Z",
"url": "https://files.pythonhosted.org/packages/db/14/3ce5ffed52f8836f781d8a10cd20c06f849903e79716e2a475521884de2c/eventsourcing_sqlalchemy-0.10-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "478136c49d2db0d94d79a31eb9aea48a94647385868bfb00dad5216a1c8cc116",
"md5": "b1cce7be5f57cfea3bb41d4def710bf5",
"sha256": "63ad18d8184f3732010bef8ee556e6bfa6b18ee5758687b86f32cf35807c2ea5"
},
"downloads": -1,
"filename": "eventsourcing_sqlalchemy-0.10.tar.gz",
"has_sig": false,
"md5_digest": "b1cce7be5f57cfea3bb41d4def710bf5",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.8",
"size": 15145,
"upload_time": "2024-11-08T15:34:39",
"upload_time_iso_8601": "2024-11-08T15:34:39.474612Z",
"url": "https://files.pythonhosted.org/packages/47/81/36c49d2db0d94d79a31eb9aea48a94647385868bfb00dad5216a1c8cc116/eventsourcing_sqlalchemy-0.10.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-11-08 15:34:39",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "pyeventsourcing",
"github_project": "eventsourcing-sqlalchemy",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "eventsourcing-sqlalchemy"
}