Add initial versioning support with SQLAlchemy-Continuum.

This commit is contained in:
Lance Edgar 2015-02-09 15:33:16 -06:00
parent 41dd2ef17b
commit def466935b
43 changed files with 717 additions and 26 deletions

View file

@ -26,17 +26,124 @@
Database Stuff
"""
import sqlalchemy as sa
from zope.sqlalchemy import datamanager
import sqlalchemy_continuum as continuum
from sqlalchemy.orm import sessionmaker, scoped_session
Session = scoped_session(sessionmaker())
from rattail.db import SessionBase
from rattail.db import model
try:
# Requires zope.sqlalchemy >= 0.7.4
from zope.sqlalchemy import register
except ImportError:
from zope.sqlalchemy import ZopeTransactionExtension
Session.configure(extension=ZopeTransactionExtension())
else:
Session = scoped_session(sessionmaker(class_=SessionBase))
class TailboneSessionDataManager(datamanager.SessionDataManager):
"""Integrate a top level sqlalchemy session transaction into a zope transaction
One phase variant.
.. note::
This class appears to be necessary in order for the Continuum
integration to work alongside the Zope transaction integration.
"""
def tpc_vote(self, trans):
# for a one phase data manager commit last in tpc_vote
if self.tx is not None: # there may have been no work to do
# Force creation of Continuum versions for current session.
mgr = continuum.get_versioning_manager(model.Product) # any ol' model will do
uow = mgr.unit_of_work(self.session)
uow.make_versions(self.session)
self.tx.commit()
self._finish('committed')
def join_transaction(session, initial_state=datamanager.STATUS_ACTIVE, transaction_manager=datamanager.zope_transaction.manager, keep_session=False):
"""Join a session to a transaction using the appropriate datamanager.
It is safe to call this multiple times, if the session is already joined
then it just returns.
`initial_state` is either STATUS_ACTIVE, STATUS_INVALIDATED or STATUS_READONLY
If using the default initial status of STATUS_ACTIVE, you must ensure that
mark_changed(session) is called when data is written to the database.
The ZopeTransactionExtesion SessionExtension can be used to ensure that this is
called automatically after session write operations.
.. note::
This function is copied from upstream, and tweaked so that our custom
:class:`TailboneSessionDataManager` will be used.
"""
if datamanager._SESSION_STATE.get(id(session), None) is None:
if session.twophase:
DataManager = datamanager.TwoPhaseSessionDataManager
else:
DataManager = TailboneSessionDataManager
DataManager(session, initial_state, transaction_manager, keep_session=keep_session)
class ZopeTransactionExtension(datamanager.ZopeTransactionExtension):
"""Record that a flush has occurred on a session's connection. This allows
the DataManager to rollback rather than commit on read only transactions.
.. note::
This class is copied from upstream, and tweaked so that our custom
:func:`join_transaction()` will be used.
"""
def after_begin(self, session, transaction, connection):
join_transaction(session, self.initial_state, self.transaction_manager, self.keep_session)
def after_attach(self, session, instance):
join_transaction(session, self.initial_state, self.transaction_manager, self.keep_session)
def register(session, initial_state=datamanager.STATUS_ACTIVE,
transaction_manager=datamanager.zope_transaction.manager, keep_session=False):
"""Register ZopeTransaction listener events on the
given Session or Session factory/class.
This function requires at least SQLAlchemy 0.7 and makes use
of the newer sqlalchemy.event package in order to register event listeners
on the given Session.
The session argument here may be a Session class or subclass, a
sessionmaker or scoped_session instance, or a specific Session instance.
Event listening will be specific to the scope of the type of argument
passed, including specificity to its subclass as well as its identity.
.. note::
This function is copied from upstream, and tweaked so that our custom
:class:`ZopeTransactionExtension` will be used.
"""
from sqlalchemy import __version__
assert tuple(int(x) for x in __version__.split(".")) >= (0, 7), \
"SQLAlchemy version 0.7 or greater required to use register()"
from sqlalchemy import event
ext = ZopeTransactionExtension(
initial_state=initial_state,
transaction_manager=transaction_manager,
keep_session=keep_session,
)
event.listen(session, "after_begin", ext.after_begin)
event.listen(session, "after_attach", ext.after_attach)
event.listen(session, "after_flush", ext.after_flush)
event.listen(session, "after_bulk_update", ext.after_bulk_update)
event.listen(session, "after_bulk_delete", ext.after_bulk_delete)
event.listen(session, "before_commit", ext.before_commit)
# TODO: We can probably assume a new SA version since we use Continuum now.
if tuple(int(x) for x in sa.__version__.split('.')) >= (0, 7):
register(Session)
else:
Session.configure(extension=ZopeTransactionExtension())