Add 'datasync' daemon.
First attempt, but already seems to work as good or better than 'dbsync'.
This commit is contained in:
parent
e12aba7174
commit
e62a570951
|
@ -318,6 +318,47 @@ class AddUser(Subcommand):
|
||||||
self.stdout.write("Created user: {0}\n".format(args.username))
|
self.stdout.write("Created user: {0}\n".format(args.username))
|
||||||
|
|
||||||
|
|
||||||
|
class DataSync(Subcommand):
|
||||||
|
"""
|
||||||
|
Interacts with the datasync daemon. This command expects a subcommand; one
|
||||||
|
of the following:
|
||||||
|
|
||||||
|
* ``rattail datasync start``
|
||||||
|
* ``rattail datasync stop``
|
||||||
|
"""
|
||||||
|
name = 'datasync'
|
||||||
|
description = "Manage the DataSync daemon"
|
||||||
|
|
||||||
|
def add_parser_args(self, parser):
|
||||||
|
subparsers = parser.add_subparsers(title='subcommands')
|
||||||
|
|
||||||
|
start = subparsers.add_parser('start', help="Start service")
|
||||||
|
start.set_defaults(subcommand='start')
|
||||||
|
stop = subparsers.add_parser('stop', help="Stop service")
|
||||||
|
stop.set_defaults(subcommand='stop')
|
||||||
|
|
||||||
|
parser.add_argument('-p', '--pidfile', metavar='PATH', default='/var/run/rattail/datasync.pid',
|
||||||
|
help="Path to PID file.")
|
||||||
|
parser.add_argument('-D', '--do-not-daemonize',
|
||||||
|
action='store_false', dest='daemonize', default=True,
|
||||||
|
help="Do not daemonize when starting.")
|
||||||
|
|
||||||
|
def run(self, args):
|
||||||
|
from rattail.datasync.daemon import DataSyncDaemon
|
||||||
|
|
||||||
|
daemon = DataSyncDaemon(args.pidfile, config=self.config)
|
||||||
|
if args.subcommand == 'stop':
|
||||||
|
daemon.stop()
|
||||||
|
else: # start
|
||||||
|
try:
|
||||||
|
daemon.start(daemonize=args.daemonize)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
if not args.daemonize:
|
||||||
|
self.stderr.write("Interrupted.\n")
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
class EmailBouncer(Subcommand):
|
class EmailBouncer(Subcommand):
|
||||||
"""
|
"""
|
||||||
Interacts with the email bouncer daemon. This command expects a
|
Interacts with the email bouncer daemon. This command expects a
|
||||||
|
|
|
@ -467,3 +467,44 @@ def get_user_file(filename, createdir=False):
|
||||||
``create`` arg, and may be used to ensure the user-level folder exists.
|
``create`` arg, and may be used to ensure the user-level folder exists.
|
||||||
"""
|
"""
|
||||||
return os.path.join(get_user_dir(create=createdir), filename)
|
return os.path.join(get_user_dir(create=createdir), filename)
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigProfile(object):
|
||||||
|
"""
|
||||||
|
Generic class to represent a config "profile", as used by the filemon and
|
||||||
|
datasync daemons, etc.
|
||||||
|
|
||||||
|
.. todo::
|
||||||
|
This clearly needs more documentation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def section(self):
|
||||||
|
"""
|
||||||
|
Each subclass of ``ConfigProfile`` must define this.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def _config_string(self, option, **kwargs):
|
||||||
|
return self.config.get(self.section, '{0}.{1}'.format(self.key, option), **kwargs)
|
||||||
|
|
||||||
|
def _config_boolean(self, option, default=None):
|
||||||
|
return self.config.getbool(self.section, '{0}.{1}'.format(self.key, option),
|
||||||
|
default=default)
|
||||||
|
|
||||||
|
def _config_int(self, option, minimum=1, default=None):
|
||||||
|
option = '{0}.{1}'.format(self.key, option)
|
||||||
|
if self.config.has_option(self.section, option):
|
||||||
|
value = self.config.getint(self.section, option)
|
||||||
|
if value < minimum:
|
||||||
|
log.warning("config value {0} is too small; falling back to minimum "
|
||||||
|
"of {1} for option: {2}".format(value, minimum, option))
|
||||||
|
value = minimum
|
||||||
|
elif default is not None and default >= minimum:
|
||||||
|
value = default
|
||||||
|
else:
|
||||||
|
value = minimum
|
||||||
|
return value
|
||||||
|
|
||||||
|
def _config_list(self, option):
|
||||||
|
return parse_list(self._config_string(option))
|
||||||
|
|
30
rattail/datasync/__init__.py
Normal file
30
rattail/datasync/__init__.py
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
################################################################################
|
||||||
|
#
|
||||||
|
# Rattail -- Retail Software Framework
|
||||||
|
# Copyright © 2010-2015 Lance Edgar
|
||||||
|
#
|
||||||
|
# This file is part of Rattail.
|
||||||
|
#
|
||||||
|
# Rattail is free software: you can redistribute it and/or modify it under the
|
||||||
|
# terms of the GNU Affero General Public License as published by the Free
|
||||||
|
# Software Foundation, either version 3 of the License, or (at your option)
|
||||||
|
# any later version.
|
||||||
|
#
|
||||||
|
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||||
|
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
|
||||||
|
# more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with Rattail. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
################################################################################
|
||||||
|
"""
|
||||||
|
DataSync Daemon
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
from .watchers import DataSyncWatcher
|
||||||
|
from .consumers import DataSyncConsumer
|
110
rattail/datasync/config.py
Normal file
110
rattail/datasync/config.py
Normal file
|
@ -0,0 +1,110 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
################################################################################
|
||||||
|
#
|
||||||
|
# Rattail -- Retail Software Framework
|
||||||
|
# Copyright © 2010-2015 Lance Edgar
|
||||||
|
#
|
||||||
|
# This file is part of Rattail.
|
||||||
|
#
|
||||||
|
# Rattail is free software: you can redistribute it and/or modify it under the
|
||||||
|
# terms of the GNU Affero General Public License as published by the Free
|
||||||
|
# Software Foundation, either version 3 of the License, or (at your option)
|
||||||
|
# any later version.
|
||||||
|
#
|
||||||
|
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||||
|
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
|
||||||
|
# more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with Rattail. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
################################################################################
|
||||||
|
"""
|
||||||
|
DataSync Configuration
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
from rattail.config import ConfigProfile, parse_list
|
||||||
|
from rattail.util import load_object
|
||||||
|
from rattail.exceptions import ConfigurationError
|
||||||
|
|
||||||
|
|
||||||
|
class DataSyncProfile(ConfigProfile):
|
||||||
|
"""
|
||||||
|
Simple class to hold configuration for a DataSync "profile". Each profile
|
||||||
|
determines which database(s) will be watched for new changes, and which
|
||||||
|
consumer(s) will then be instructed to process the changes.
|
||||||
|
|
||||||
|
.. todo::
|
||||||
|
This clearly needs more documentation.
|
||||||
|
"""
|
||||||
|
section = 'rattail.datasync'
|
||||||
|
|
||||||
|
def __init__(self, config, key):
|
||||||
|
self.config = config
|
||||||
|
self.key = key
|
||||||
|
|
||||||
|
self.watcher_spec = self._config_string('watcher')
|
||||||
|
self.watcher = load_object(self.watcher_spec)(config, key)
|
||||||
|
self.watcher.delay = self._config_int('watcher.delay', default=self.watcher.delay)
|
||||||
|
|
||||||
|
self.consumers = self.normalize_consumers(self._config_list('consumers'))
|
||||||
|
self.consumer_delay = self._config_int('consumer_delay', default=1)
|
||||||
|
self.watcher.consumer_stub_keys = [c.key for c in self.isolated_consumers]
|
||||||
|
if self.common_consumers:
|
||||||
|
self.watcher.consumer_stub_keys.append(None)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def isolated_consumers(self):
|
||||||
|
return [c for c in self.consumers if c.isolated]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def common_consumers(self):
|
||||||
|
return [c for c in self.consumers if not c.isolated]
|
||||||
|
|
||||||
|
def normalize_consumers(self, raw_consumers):
|
||||||
|
# TODO: This will do for now, but isn't as awesome as it probably could be.
|
||||||
|
default_isolated = self._config_boolean('consumers.isolated', default=True)
|
||||||
|
default_ignored = self._config_string('consumers.ignore')
|
||||||
|
consumers = []
|
||||||
|
for key in raw_consumers:
|
||||||
|
consumer_spec = self._config_string('consumer.{0}'.format(key))
|
||||||
|
dbkey = self._config_string('consumer.{0}.db'.format(key), default=key)
|
||||||
|
ignored = self._config_string('consumer.{0}.ignore'.format(key), default=default_ignored)
|
||||||
|
consumer = load_object(consumer_spec)(self.config, key, dbkey=dbkey, ignored=ignored)
|
||||||
|
consumer.spec = consumer_spec
|
||||||
|
consumer.isolated = self._config_boolean('consumer.{0}.isolated'.format(key),
|
||||||
|
default=default_isolated)
|
||||||
|
consumers.append(consumer)
|
||||||
|
return consumers
|
||||||
|
|
||||||
|
|
||||||
|
def get_profile_keys(config):
|
||||||
|
"""
|
||||||
|
Returns a list of profile keys used in the DataSync configuration.
|
||||||
|
"""
|
||||||
|
keys = config.get('rattail.datasync', 'watch')
|
||||||
|
if keys:
|
||||||
|
return parse_list(keys)
|
||||||
|
|
||||||
|
|
||||||
|
def load_profiles(config):
|
||||||
|
"""
|
||||||
|
Load all active DataSync profiles defined within configuration.
|
||||||
|
"""
|
||||||
|
# Make sure we have a top-level directive.
|
||||||
|
keys = get_profile_keys(config)
|
||||||
|
if not keys:
|
||||||
|
raise ConfigurationError(
|
||||||
|
"The DataSync configuration does not specify any profiles "
|
||||||
|
"to be watched. Please defined the 'watch' option within "
|
||||||
|
"the [rattail.datasync] section of your config file.")
|
||||||
|
|
||||||
|
watched = {}
|
||||||
|
for key in keys:
|
||||||
|
profile = DataSyncProfile(config, key)
|
||||||
|
watched[key] = profile
|
||||||
|
return watched
|
44
rattail/datasync/consumers.py
Normal file
44
rattail/datasync/consumers.py
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
################################################################################
|
||||||
|
#
|
||||||
|
# Rattail -- Retail Software Framework
|
||||||
|
# Copyright © 2010-2015 Lance Edgar
|
||||||
|
#
|
||||||
|
# This file is part of Rattail.
|
||||||
|
#
|
||||||
|
# Rattail is free software: you can redistribute it and/or modify it under the
|
||||||
|
# terms of the GNU Affero General Public License as published by the Free
|
||||||
|
# Software Foundation, either version 3 of the License, or (at your option)
|
||||||
|
# any later version.
|
||||||
|
#
|
||||||
|
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||||
|
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
|
||||||
|
# more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with Rattail. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
################################################################################
|
||||||
|
"""
|
||||||
|
DataSync Consumers
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
from rattail.config import parse_list
|
||||||
|
|
||||||
|
|
||||||
|
class DataSyncConsumer(object):
|
||||||
|
|
||||||
|
def __init__(self, config, key, dbkey=None, ignored=None):
|
||||||
|
self.config = config
|
||||||
|
self.key = key
|
||||||
|
self.dbkey = dbkey
|
||||||
|
self.ignored = parse_list(ignored or '')
|
||||||
|
|
||||||
|
def process_changes(self, session, changes):
|
||||||
|
"""
|
||||||
|
Process (consume) a set of changes.
|
||||||
|
"""
|
169
rattail/datasync/daemon.py
Normal file
169
rattail/datasync/daemon.py
Normal file
|
@ -0,0 +1,169 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
################################################################################
|
||||||
|
#
|
||||||
|
# Rattail -- Retail Software Framework
|
||||||
|
# Copyright © 2010-2015 Lance Edgar
|
||||||
|
#
|
||||||
|
# This file is part of Rattail.
|
||||||
|
#
|
||||||
|
# Rattail is free software: you can redistribute it and/or modify it under the
|
||||||
|
# terms of the GNU Affero General Public License as published by the Free
|
||||||
|
# Software Foundation, either version 3 of the License, or (at your option)
|
||||||
|
# any later version.
|
||||||
|
#
|
||||||
|
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||||
|
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
|
||||||
|
# more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with Rattail. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
################################################################################
|
||||||
|
"""
|
||||||
|
DataSync for Linux
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import time
|
||||||
|
import datetime
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from rattail.daemon import Daemon
|
||||||
|
from rattail.threads import Thread
|
||||||
|
from rattail.datasync.config import load_profiles
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class DataSyncDaemon(Daemon):
|
||||||
|
"""
|
||||||
|
Linux daemon implementation of DataSync.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
"""
|
||||||
|
Starts watcher and consumer threads according to configuration.
|
||||||
|
"""
|
||||||
|
for key, profile in load_profiles(self.config).iteritems():
|
||||||
|
|
||||||
|
# Create watcher thread for the profile.
|
||||||
|
name = '{0}-watcher'.format(key)
|
||||||
|
log.debug("starting thread '{0}' with watcher: {1}".format(name, profile.watcher_spec))
|
||||||
|
thread = Thread(target=watch_for_changes, name=name, args=(profile.watcher,))
|
||||||
|
thread.daemon = True
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
# Create a thread for each "isolated" consumer.
|
||||||
|
for consumer in profile.isolated_consumers:
|
||||||
|
name = '{0}-consumer-{1}'.format(key, consumer.key)
|
||||||
|
log.debug("starting thread '{0}' with isolated consumer: {1}".format(name, consumer.spec))
|
||||||
|
thread = Thread(target=consume_changes, name=name, args=(profile, [consumer]))
|
||||||
|
thread.daemon = True
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
# Maybe create a common (shared transaction) thread for the rest of
|
||||||
|
# the consumers.
|
||||||
|
if profile.common_consumers:
|
||||||
|
name = '{0}-consumer-shared'.format(key)
|
||||||
|
log.debug("starting thread '{0}' with consumer(s): {1}".format(
|
||||||
|
name, ','.join(["{0} ({1})".format(c.key, c.spec) for c in profile.common_consumers])))
|
||||||
|
thread = Thread(target=consume_changes, name=name, args=(profile, profile.common_consumers,))
|
||||||
|
thread.daemon = True
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
# Loop indefinitely. Since this is the main thread, the app will
|
||||||
|
# terminate when this method ends; all other threads are "subservient"
|
||||||
|
# to this one.
|
||||||
|
while True:
|
||||||
|
time.sleep(.01)
|
||||||
|
|
||||||
|
|
||||||
|
def watch_for_changes(watcher):
|
||||||
|
"""
|
||||||
|
Target for DataSync watcher threads.
|
||||||
|
"""
|
||||||
|
from rattail.db import Session, model
|
||||||
|
|
||||||
|
while True:
|
||||||
|
|
||||||
|
changes = watcher.get_changes()
|
||||||
|
if changes:
|
||||||
|
log.debug("got {0} changes from '{1}' watcher".format(len(changes), watcher.key))
|
||||||
|
|
||||||
|
# Give all change stubs the same timestamp, to help identify them
|
||||||
|
# as a "batch" of sorts, so consumers can process them as such.
|
||||||
|
now = datetime.datetime.utcnow()
|
||||||
|
|
||||||
|
# Save change stub records to rattail database, for consumer thread
|
||||||
|
# to find and process.
|
||||||
|
saved = 0
|
||||||
|
session = Session()
|
||||||
|
for key, change in changes:
|
||||||
|
for consumer in watcher.consumer_stub_keys:
|
||||||
|
session.add(model.DataSyncChange(
|
||||||
|
source=watcher.key,
|
||||||
|
payload_type=change.payload_type,
|
||||||
|
payload_key=change.payload_key,
|
||||||
|
deletion=change.deletion,
|
||||||
|
obtained=now,
|
||||||
|
consumer=consumer))
|
||||||
|
saved += 1
|
||||||
|
session.flush()
|
||||||
|
session.commit()
|
||||||
|
session.close()
|
||||||
|
log.debug("saved {0} '{1}' change stubs to rattail database".format(saved, watcher.key))
|
||||||
|
|
||||||
|
# Tell watcher to prune the original change records from its source
|
||||||
|
# database, if relevant. Note that we only give it the keys for this.
|
||||||
|
pruned = watcher.prune_changes([c[0] for c in changes])
|
||||||
|
if pruned is not None:
|
||||||
|
log.debug("pruned {0} changes from '{1}' database".format(pruned, watcher.key))
|
||||||
|
|
||||||
|
time.sleep(watcher.delay)
|
||||||
|
|
||||||
|
|
||||||
|
def consume_changes(profile, consumers):
|
||||||
|
"""
|
||||||
|
Target for DataSync consumer thread.
|
||||||
|
"""
|
||||||
|
from rattail.db import Session, model
|
||||||
|
|
||||||
|
def process(session, consumer, changes):
|
||||||
|
consumer.process_changes(session, changes)
|
||||||
|
for change in changes:
|
||||||
|
session.delete(change)
|
||||||
|
session.flush()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
|
||||||
|
session = Session()
|
||||||
|
for consumer in consumers:
|
||||||
|
|
||||||
|
changes = session.query(model.DataSyncChange).filter(
|
||||||
|
model.DataSyncChange.source == profile.key,
|
||||||
|
model.DataSyncChange.consumer == consumer.key)\
|
||||||
|
.order_by(model.DataSyncChange.obtained)\
|
||||||
|
.all()
|
||||||
|
|
||||||
|
if changes:
|
||||||
|
log.debug("found {0} changes to consume".format(len(changes)))
|
||||||
|
|
||||||
|
# Process (consume) changes in batches, according to timestamp.
|
||||||
|
batch = []
|
||||||
|
timestamp = None
|
||||||
|
for change in changes:
|
||||||
|
if timestamp and change.obtained != timestamp:
|
||||||
|
process(session, consumer, batch)
|
||||||
|
batch = []
|
||||||
|
batch.append(change)
|
||||||
|
timestamp = change.obtained
|
||||||
|
process(session, consumer, batch)
|
||||||
|
|
||||||
|
session.commit()
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
time.sleep(profile.consumer_delay)
|
287
rattail/datasync/rattail.py
Normal file
287
rattail/datasync/rattail.py
Normal file
|
@ -0,0 +1,287 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
################################################################################
|
||||||
|
#
|
||||||
|
# Rattail -- Retail Software Framework
|
||||||
|
# Copyright © 2010-2015 Lance Edgar
|
||||||
|
#
|
||||||
|
# This file is part of Rattail.
|
||||||
|
#
|
||||||
|
# Rattail is free software: you can redistribute it and/or modify it under the
|
||||||
|
# terms of the GNU Affero General Public License as published by the Free
|
||||||
|
# Software Foundation, either version 3 of the License, or (at your option)
|
||||||
|
# any later version.
|
||||||
|
#
|
||||||
|
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||||
|
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
|
||||||
|
# more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with Rattail. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
################################################################################
|
||||||
|
"""
|
||||||
|
DataSync for Rattail
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from sqlalchemy import orm
|
||||||
|
|
||||||
|
from rattail.db import Session, model
|
||||||
|
from rattail.datasync import DataSyncWatcher, DataSyncConsumer
|
||||||
|
from rattail.config import parse_list
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class RattailWatcher(DataSyncWatcher):
|
||||||
|
"""
|
||||||
|
DataSync watcher for Rattail databases.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def get_changes(self):
|
||||||
|
"""
|
||||||
|
Checks the :class:`rattail.db.model:Change` table in a Rattail
|
||||||
|
database, to see if there are any pending changes for the datasync
|
||||||
|
daemon.
|
||||||
|
"""
|
||||||
|
session = Session()
|
||||||
|
changes = session.query(model.Change).all()
|
||||||
|
session.expunge_all()
|
||||||
|
session.close()
|
||||||
|
if changes:
|
||||||
|
return [
|
||||||
|
((c.class_name, c.uuid),
|
||||||
|
model.DataSyncChange(
|
||||||
|
payload_type=c.class_name,
|
||||||
|
payload_key=c.uuid,
|
||||||
|
deletion=c.deleted))
|
||||||
|
for c in changes]
|
||||||
|
|
||||||
|
def prune_changes(self, keys):
|
||||||
|
deleted = 0
|
||||||
|
session = Session()
|
||||||
|
for key in keys:
|
||||||
|
change = session.query(model.Change).get(key)
|
||||||
|
if change:
|
||||||
|
session.delete(change)
|
||||||
|
session.flush()
|
||||||
|
deleted += 1
|
||||||
|
session.commit()
|
||||||
|
session.close()
|
||||||
|
return deleted
|
||||||
|
|
||||||
|
|
||||||
|
class RattailConsumer(DataSyncConsumer):
|
||||||
|
"""
|
||||||
|
DataSync consumer for Rattail databases.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(RattailConsumer, self).__init__(*args, **kwargs)
|
||||||
|
self.engine = self.config.rattail_engines[self.dbkey]
|
||||||
|
self.model = self.get_data_model()
|
||||||
|
|
||||||
|
def get_data_model(self):
|
||||||
|
"""
|
||||||
|
Subclasses may override this if they have extended the schema.
|
||||||
|
Defaults to ``rattail.db.model``.
|
||||||
|
"""
|
||||||
|
return model
|
||||||
|
|
||||||
|
def process_changes(self, host_session, changes):
|
||||||
|
"""
|
||||||
|
Process changes for a Rattail database.
|
||||||
|
"""
|
||||||
|
# First we must determine which models are represented in the change
|
||||||
|
# set. Some models may be ignored, per config.
|
||||||
|
class_names = set([c.payload_type for c in changes])
|
||||||
|
ignored = sorted(set(self.ignored) & class_names)
|
||||||
|
if ignored:
|
||||||
|
log.debug("ignoring changes of type(s): {0}".format(', '.join(ignored)))
|
||||||
|
class_names = [c for c in class_names if c not in self.ignored]
|
||||||
|
|
||||||
|
# The order will matter because of table foreign key dependencies.
|
||||||
|
# However the dependency_sort() call doesn't *quite* take care of
|
||||||
|
# everything - notably the Product/ProductPrice situation. Since those
|
||||||
|
# classes are mutually dependent, we start with a hackish lexical sort
|
||||||
|
# and hope for the best...
|
||||||
|
class_names = sorted(class_names)
|
||||||
|
class_names.sort(cmp=self.dependency_sort)
|
||||||
|
|
||||||
|
session = Session(bind=self.engine)
|
||||||
|
|
||||||
|
for class_name in class_names:
|
||||||
|
cls = getattr(self.model, class_name)
|
||||||
|
|
||||||
|
for change in [c for c in changes if c.payload_type == class_name]:
|
||||||
|
log.debug("processing {0} for {1} {2}".format(
|
||||||
|
"deletion" if change.deletion else "change",
|
||||||
|
change.payload_type, change.payload_key))
|
||||||
|
|
||||||
|
if change.deletion:
|
||||||
|
instance = session.query(cls).get(change.payload_key)
|
||||||
|
if instance:
|
||||||
|
self.delete_instance(session, instance)
|
||||||
|
session.flush()
|
||||||
|
else:
|
||||||
|
log.warning("could not find instance to delete")
|
||||||
|
|
||||||
|
else: # add/update
|
||||||
|
host_instance = host_session.query(cls).get(change.payload_key)
|
||||||
|
if host_instance:
|
||||||
|
self.merge_instance(session, host_instance)
|
||||||
|
session.flush()
|
||||||
|
else:
|
||||||
|
log.warning("could not find host instance to merge")
|
||||||
|
|
||||||
|
session.commit()
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
def dependency_sort(self, x, y):
|
||||||
|
map_x = orm.class_mapper(getattr(self.model, x))
|
||||||
|
map_y = orm.class_mapper(getattr(self.model, y))
|
||||||
|
|
||||||
|
dep_x = []
|
||||||
|
table_y = map_y.tables[0].name
|
||||||
|
for column in map_x.columns:
|
||||||
|
for key in column.foreign_keys:
|
||||||
|
if key.column.table.name == table_y:
|
||||||
|
return 1
|
||||||
|
dep_x.append(key)
|
||||||
|
|
||||||
|
dep_y = []
|
||||||
|
table_x = map_x.tables[0].name
|
||||||
|
for column in map_y.columns:
|
||||||
|
for key in column.foreign_keys:
|
||||||
|
if key.column.table.name == table_x:
|
||||||
|
return -1
|
||||||
|
dep_y.append(key)
|
||||||
|
|
||||||
|
if dep_x and not dep_y:
|
||||||
|
return 1
|
||||||
|
if dep_y and not dep_x:
|
||||||
|
return -1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def merge_instance(self, session, instance):
|
||||||
|
"""
|
||||||
|
Merge the given model instance into the given database session.
|
||||||
|
|
||||||
|
Subclasses may define model-specific methods instead of or in addition
|
||||||
|
to overriding this generic one.
|
||||||
|
"""
|
||||||
|
class_name = instance.__class__.__name__.lower()
|
||||||
|
merger = getattr(self, 'merge_{0}'.format(class_name), None)
|
||||||
|
if merger:
|
||||||
|
return merger(session, instance)
|
||||||
|
|
||||||
|
# Nothing special defined, so just do the normal thing.
|
||||||
|
return session.merge(instance)
|
||||||
|
|
||||||
|
def delete_instance(self, session, instance):
|
||||||
|
"""
|
||||||
|
Delete the given model instance from the given database session.
|
||||||
|
|
||||||
|
Subclasses may define model-specific methods instead of or in addition
|
||||||
|
to overriding this generic one.
|
||||||
|
"""
|
||||||
|
class_name = instance.__class__.__name__.lower()
|
||||||
|
|
||||||
|
deleter = getattr(self, 'delete_{0}'.format(class_name), None)
|
||||||
|
if deleter:
|
||||||
|
return deleter(session, instance)
|
||||||
|
|
||||||
|
predeleter = getattr(self, 'predelete_{0}'.format(class_name), None)
|
||||||
|
if predeleter:
|
||||||
|
predeleter(session, instance)
|
||||||
|
|
||||||
|
session.delete(instance)
|
||||||
|
|
||||||
|
def merge_product(self, session, source_product):
|
||||||
|
"""
|
||||||
|
This method is somewhat of a hack, in order to properly handle
|
||||||
|
:class:`rattail.db.model.Product` instances and the interdependent
|
||||||
|
nature of the related :class:`rattail.db.model.ProductPrice` instances.
|
||||||
|
"""
|
||||||
|
|
||||||
|
target_product = session.merge(source_product)
|
||||||
|
|
||||||
|
# I'm not 100% sure I understand this correctly, but here's my
|
||||||
|
# thinking: First we clear the price relationships in case they've
|
||||||
|
# actually gone away; then we re-establish any which are currently
|
||||||
|
# valid.
|
||||||
|
|
||||||
|
# Setting the price relationship attributes to ``None`` isn't enough to
|
||||||
|
# force the ORM to notice a change, since the UUID field is ultimately
|
||||||
|
# what it's watching. So we must explicitly use that field here.
|
||||||
|
target_product.regular_price_uuid = None
|
||||||
|
target_product.current_price_uuid = None
|
||||||
|
|
||||||
|
# If the source instance has currently valid price relationships, then
|
||||||
|
# we re-establish them. We must merge the source price instance in
|
||||||
|
# order to be certain it will exist in the target session, and avoid
|
||||||
|
# foreign key errors. However we *still* must also set the UUID fields
|
||||||
|
# because again, the ORM is watching those... This was noticed to be
|
||||||
|
# the source of some bugs where successive database syncs were
|
||||||
|
# effectively "toggling" the price relationship. Setting the UUID
|
||||||
|
# field explicitly seems to solve it.
|
||||||
|
if source_product.regular_price_uuid:
|
||||||
|
target_product.regular_price = session.merge(source_product.regular_price)
|
||||||
|
target_product.regular_price_uuid = target_product.regular_price.uuid
|
||||||
|
if source_product.current_price_uuid:
|
||||||
|
target_product.current_price = session.merge(source_product.current_price)
|
||||||
|
target_product.current_price_uuid = target_product.current_price.uuid
|
||||||
|
|
||||||
|
return target_product
|
||||||
|
|
||||||
|
def predelete_department(self, session, department):
|
||||||
|
|
||||||
|
# Disconnect from all subdepartments.
|
||||||
|
q = session.query(model.Subdepartment).filter(
|
||||||
|
model.Subdepartment.department == department)
|
||||||
|
for subdept in q:
|
||||||
|
subdept.department = None
|
||||||
|
|
||||||
|
# Disconnect from all products.
|
||||||
|
q = session.query(model.Product).filter(
|
||||||
|
model.Product.department == department)
|
||||||
|
for product in q:
|
||||||
|
product.department = None
|
||||||
|
|
||||||
|
def predelete_subdepartment(self, session, subdepartment):
|
||||||
|
|
||||||
|
# Disconnect from all products.
|
||||||
|
q = session.query(model.Product).filter(
|
||||||
|
model.Product.subdepartment == subdepartment)
|
||||||
|
for product in q:
|
||||||
|
product.subdepartment = None
|
||||||
|
|
||||||
|
def predelete_family(self, session, family):
|
||||||
|
|
||||||
|
# Disconnect from all products.
|
||||||
|
q = session.query(model.Product).filter(
|
||||||
|
model.Product.family == family)
|
||||||
|
for product in q:
|
||||||
|
product.family = None
|
||||||
|
|
||||||
|
def predelete_vendor(self, session, vendor):
|
||||||
|
|
||||||
|
# Remove all product costs.
|
||||||
|
q = session.query(model.ProductCost).filter(
|
||||||
|
model.ProductCost.vendor == vendor)
|
||||||
|
for cost in q:
|
||||||
|
session.delete(cost)
|
||||||
|
|
||||||
|
def predelete_customergroup(self, session, group):
|
||||||
|
|
||||||
|
# Disconnect from all customers.
|
||||||
|
q = session.query(model.CustomerGroupAssignment).filter(
|
||||||
|
model.CustomerGroupAssignment.group == group)
|
||||||
|
for assignment in q:
|
||||||
|
session.delete(assignment)
|
52
rattail/datasync/watchers.py
Normal file
52
rattail/datasync/watchers.py
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
################################################################################
|
||||||
|
#
|
||||||
|
# Rattail -- Retail Software Framework
|
||||||
|
# Copyright © 2010-2015 Lance Edgar
|
||||||
|
#
|
||||||
|
# This file is part of Rattail.
|
||||||
|
#
|
||||||
|
# Rattail is free software: you can redistribute it and/or modify it under the
|
||||||
|
# terms of the GNU Affero General Public License as published by the Free
|
||||||
|
# Software Foundation, either version 3 of the License, or (at your option)
|
||||||
|
# any later version.
|
||||||
|
#
|
||||||
|
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||||
|
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
|
||||||
|
# more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with Rattail. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
################################################################################
|
||||||
|
"""
|
||||||
|
DataSync Watchers
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
|
||||||
|
class DataSyncWatcher(object):
|
||||||
|
"""
|
||||||
|
Base class for all DataSync watchers.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, config, key):
|
||||||
|
self.config = config
|
||||||
|
self.key = key
|
||||||
|
self.delay = 1 # seconds
|
||||||
|
|
||||||
|
def get_changes(self):
|
||||||
|
"""
|
||||||
|
This must be implemented by the subclass. It should check the source
|
||||||
|
database for pending changes, and return a list of
|
||||||
|
:class:`rattail.db.model.DataSyncChange` instances representing the
|
||||||
|
source changes.
|
||||||
|
"""
|
||||||
|
return []
|
||||||
|
|
||||||
|
def prune_changes(self, keys):
|
||||||
|
"""
|
||||||
|
Prune change records from the source database, if relevant.
|
||||||
|
"""
|
|
@ -0,0 +1,38 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
"""add datasync change
|
||||||
|
|
||||||
|
Revision ID: 188810a5f9db
|
||||||
|
Revises: 3a7029aed67
|
||||||
|
Create Date: 2015-10-17 01:07:39.015318
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '188810a5f9db'
|
||||||
|
down_revision = u'3a7029aed67'
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
import rattail.db.types
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
op.create_table('datasync_change',
|
||||||
|
sa.Column('uuid', sa.String(length=32), nullable=False),
|
||||||
|
sa.Column('source', sa.String(length=20), nullable=False),
|
||||||
|
sa.Column('payload_type', sa.String(length=20), nullable=False),
|
||||||
|
sa.Column('payload_key', sa.String(length=50), nullable=False),
|
||||||
|
sa.Column('deletion', sa.Boolean(), nullable=False),
|
||||||
|
sa.Column('obtained', sa.DateTime(), nullable=False),
|
||||||
|
sa.Column('consumer', sa.String(length=20), nullable=True),
|
||||||
|
sa.PrimaryKeyConstraint('uuid')
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
op.drop_table('datasync_change')
|
|
@ -153,7 +153,7 @@ class ChangeRecorder(object):
|
||||||
from rattail.db import model
|
from rattail.db import model
|
||||||
|
|
||||||
# No need to record changes for changes.
|
# No need to record changes for changes.
|
||||||
if isinstance(instance, model.Change):
|
if isinstance(instance, (model.Change, model.DataSyncChange)):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# No need to record changes for batch data.
|
# No need to record changes for batch data.
|
||||||
|
|
|
@ -38,6 +38,7 @@ from .vendors import Vendor, VendorPhoneNumber, VendorEmailAddress, VendorContac
|
||||||
from .org import Department, Subdepartment, Category, Family, ReportCode, DepositLink
|
from .org import Department, Subdepartment, Category, Family, ReportCode, DepositLink
|
||||||
from .products import Brand, Tax, Product, ProductCode, ProductCost, ProductPrice
|
from .products import Brand, Tax, Product, ProductCode, ProductCost, ProductPrice
|
||||||
|
|
||||||
|
from .datasync import DataSyncChange
|
||||||
from .batches import Batch, BatchColumn, BatchRow
|
from .batches import Batch, BatchColumn, BatchRow
|
||||||
from .labels import LabelProfile
|
from .labels import LabelProfile
|
||||||
from .bouncer import EmailBounce
|
from .bouncer import EmailBounce
|
||||||
|
|
78
rattail/db/model/datasync.py
Normal file
78
rattail/db/model/datasync.py
Normal file
|
@ -0,0 +1,78 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
################################################################################
|
||||||
|
#
|
||||||
|
# Rattail -- Retail Software Framework
|
||||||
|
# Copyright © 2010-2015 Lance Edgar
|
||||||
|
#
|
||||||
|
# This file is part of Rattail.
|
||||||
|
#
|
||||||
|
# Rattail is free software: you can redistribute it and/or modify it under the
|
||||||
|
# terms of the GNU Affero General Public License as published by the Free
|
||||||
|
# Software Foundation, either version 3 of the License, or (at your option)
|
||||||
|
# any later version.
|
||||||
|
#
|
||||||
|
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||||
|
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
|
||||||
|
# more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with Rattail. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
################################################################################
|
||||||
|
"""
|
||||||
|
Data Models for DataSync Daemon
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
from rattail.db.model import Base, uuid_column
|
||||||
|
|
||||||
|
|
||||||
|
class DataSyncChange(Base):
|
||||||
|
"""
|
||||||
|
Represents a change obtained from a DataSync watcher thread, and destined
|
||||||
|
for one or more DataSync consumers.
|
||||||
|
"""
|
||||||
|
__tablename__ = 'datasync_change'
|
||||||
|
|
||||||
|
uuid = uuid_column()
|
||||||
|
|
||||||
|
source = sa.Column(sa.String(length=20), nullable=False, doc="""
|
||||||
|
Key of the watcher from which this change was obtained.
|
||||||
|
""")
|
||||||
|
|
||||||
|
payload_type = sa.Column(sa.String(length=20), nullable=False, doc="""
|
||||||
|
The "type" of payload represented by the change, e.g. 'Person'. The
|
||||||
|
:attr:`payload_key` should be unique for a given payload type.
|
||||||
|
""")
|
||||||
|
|
||||||
|
payload_key = sa.Column(sa.String(length=50), nullable=False, doc="""
|
||||||
|
Key for the payload (presumably unique when combined with
|
||||||
|
:attr:`payload_type`) represented by the change, within the watched
|
||||||
|
database.
|
||||||
|
""")
|
||||||
|
|
||||||
|
deletion = sa.Column(sa.Boolean(), nullable=False, default=False, doc="""
|
||||||
|
Whether the change represents a deletion; defaults to ``False``.
|
||||||
|
""")
|
||||||
|
|
||||||
|
obtained = sa.Column(sa.DateTime(), nullable=False, default=datetime.datetime.utcnow, doc="""
|
||||||
|
Date and time when the change was obtained from the watcher thread.
|
||||||
|
""")
|
||||||
|
|
||||||
|
consumer = sa.Column(sa.String(length=20), nullable=True, doc="""
|
||||||
|
Configured key of the DataSync consumer for which this change is destined.
|
||||||
|
This may be NULL, in which case the change will go to all consumers
|
||||||
|
configured as not "isolated".
|
||||||
|
""")
|
||||||
|
|
||||||
|
def __unicode__(self):
|
||||||
|
return "{0}: {1}{2}".format(
|
||||||
|
self.payload_type,
|
||||||
|
self.payload_key,
|
||||||
|
" (deletion)" if self.deletion else "")
|
1
setup.py
1
setup.py
|
@ -207,6 +207,7 @@ rattailw = rattail.commands:main
|
||||||
[rattail.commands]
|
[rattail.commands]
|
||||||
adduser = rattail.commands:AddUser
|
adduser = rattail.commands:AddUser
|
||||||
bouncer = rattail.commands:EmailBouncer
|
bouncer = rattail.commands:EmailBouncer
|
||||||
|
datasync = rattail.commands:DataSync
|
||||||
date-organize = rattail.commands:DateOrganize
|
date-organize = rattail.commands:DateOrganize
|
||||||
dbsync = rattail.commands:DatabaseSyncCommand
|
dbsync = rattail.commands:DatabaseSyncCommand
|
||||||
dump = rattail.commands:Dump
|
dump = rattail.commands:Dump
|
||||||
|
|
Loading…
Reference in a new issue