From a0054d223ac4993a6c38a9d5e87a1f773cb2a068 Mon Sep 17 00:00:00 2001 From: Lance Edgar Date: Sat, 27 Nov 2021 09:56:17 -0600 Subject: [PATCH] Move datasync command logic to its own module --- rattail/commands/core.py | 162 ---------------------------- rattail/commands/datasync.py | 200 +++++++++++++++++++++++++++++++++++ setup.py | 2 +- 3 files changed, 201 insertions(+), 163 deletions(-) create mode 100644 rattail/commands/datasync.py diff --git a/rattail/commands/core.py b/rattail/commands/core.py index b7ae39ee..19b48a6c 100644 --- a/rattail/commands/core.py +++ b/rattail/commands/core.py @@ -29,7 +29,6 @@ from __future__ import unicode_literals, absolute_import import os import sys import json -import time import platform import argparse import datetime @@ -525,167 +524,6 @@ class CloneDatabase(Subcommand): dst_session.close() -class DataSync(Subcommand): - """ - Manage the data sync daemon - """ - name = 'datasync' - description = __doc__.strip() - - def add_parser_args(self, parser): - subparsers = parser.add_subparsers(title='subcommands') - - start = subparsers.add_parser('start', help="Start service") - start.set_defaults(subcommand='start') - - stop = subparsers.add_parser('stop', help="Stop service") - stop.set_defaults(subcommand='stop') - - check = subparsers.add_parser('check', help="(DEPRECATED) Check for stale (lingering) changes in queue") - check.set_defaults(subcommand='check') - - check_queue = subparsers.add_parser('check-queue', help="Check for stale (lingering) changes in queue") - check_queue.set_defaults(subcommand='check-queue') - - check_watchers = subparsers.add_parser('check-watchers', help="Check for dead watcher threads") - check_watchers.set_defaults(subcommand='check-watchers') - - wait = subparsers.add_parser('wait', help="Wait for changes to be processed") - wait.set_defaults(subcommand='wait') - - parser.add_argument('-p', '--pidfile', metavar='PATH', default='/var/run/rattail/datasync.pid', - help="Path to PID file.") - parser.add_argument('--daemonize', action='store_true', default=True, # TODO: should default to False - help="Daemonize when starting.") - parser.add_argument('--no-daemonize', - '-D', '--do-not-daemonize', # TODO: (re)move these? - action='store_false', dest='daemonize', - help="Do not daemonize when starting.") - parser.add_argument('-T', '--timeout', metavar='MINUTES', type=int, default=0, - help="Optional timeout (in minutes) for use with the 'wait' or 'check' commands. " - "If specified for 'wait', the waiting still stop after the given number of minutes " - "and exit with a nonzero code to indicate failure. If specified for 'check', the " - "command will perform some health check based on the given timeout, and exit with " - "nonzero code if the check fails.") - - def run(self, args): - from rattail.datasync.daemon import DataSyncDaemon - - if args.subcommand == 'wait': - self.wait(args) - - elif args.subcommand == 'check': - self.check_queue(args) - - elif args.subcommand == 'check-queue': - self.check_queue(args) - - elif args.subcommand == 'check-watchers': - self.check_watchers(args) - - else: # manage the daemon - daemon = DataSyncDaemon(args.pidfile, config=self.config) - if args.subcommand == 'stop': - daemon.stop() - else: # start - try: - daemon.start(daemonize=args.daemonize) - except KeyboardInterrupt: - if not args.daemonize: - self.stderr.write("Interrupted.\n") - else: - raise - - def wait(self, args): - model = self.model - session = self.make_session() - started = make_utc() - log.debug("will wait for current change queue to clear") - last_logged = started - - changes = session.query(model.DataSyncChange) - count = changes.count() - log.debug("there are %d changes in the queue", count) - while count: - try: - now = make_utc() - - if args.timeout and (now - started).seconds >= (args.timeout * 60): - log.warning("datasync wait timed out after %d minutes, with %d changes in queue", - args.timeout, count) - sys.exit(1) - - if (now - last_logged).seconds >= 60: - log.debug("still waiting, %d changes in the datasync queue", count) - last_logged = now - - time.sleep(1) - count = changes.count() - - except KeyboardInterrupt: - self.stderr.write("Waiting cancelled by user\n") - session.close() - sys.exit(1) - - session.close() - log.debug("all datasync changes have been processed") - - def check_queue(self, args): - """ - Perform general queue / health check for datasync. - """ - model = self.model - session = self.make_session() - - # looking for changes which have been around for "timeout" minutes - timeout = args.timeout or 90 - cutoff = make_utc() - datetime.timedelta(seconds=60 * timeout) - changes = session.query(model.DataSyncChange)\ - .filter(model.DataSyncChange.obtained < cutoff)\ - .count() - session.close() - - # if we found stale changes, then "fail" - otherwise we'll "succeed" - if changes: - # TODO: should log set of unique payload types, for troubleshooting - log.debug("found %s changes, in queue for %s minutes", changes, timeout) - self.stderr.write("Found {} changes, in queue for {} minutes\n".format(changes, timeout)) - sys.exit(1) - - log.info("found no changes in queue for %s minutes", timeout) - - def check_watchers(self, args): - """ - Perform general health check for datasync watcher threads. - """ - from rattail.datasync.config import load_profiles - from rattail.datasync.util import get_lastrun - - profiles = load_profiles(self.config) - session = self.make_session() - - # cutoff is "timeout" minutes before "now" - timeout = args.timeout or 15 - cutoff = make_utc() - datetime.timedelta(seconds=60 * timeout) - - dead = [] - for key in profiles: - - # looking for watcher "last run" time older than "timeout" minutes - lastrun = get_lastrun(self.config, key, tzinfo=False, session=session) - if lastrun and lastrun < cutoff: - dead.append(key) - - session.close() - - # if we found dead watchers, then "fail" - otherwise we'll "succeed" - if dead: - self.stderr.write("Found {} watcher threads dead for {} minutes: {}\n".format(len(dead), timeout, ', '.join(dead))) - sys.exit(1) - - log.info("found no watcher threads dead for %s minutes", timeout) - - class EmailBouncer(Subcommand): """ Interacts with the email bouncer daemon. This command expects a diff --git a/rattail/commands/datasync.py b/rattail/commands/datasync.py new file mode 100644 index 00000000..4cb1ee15 --- /dev/null +++ b/rattail/commands/datasync.py @@ -0,0 +1,200 @@ +# -*- coding: utf-8; -*- +################################################################################ +# +# Rattail -- Retail Software Framework +# Copyright © 2010-2021 Lance Edgar +# +# This file is part of Rattail. +# +# Rattail is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# Rattail. If not, see . +# +################################################################################ +""" +Console Commands +""" + +from __future__ import unicode_literals, absolute_import + +import os +import sys +import time +import datetime +import logging + +from rattail.commands import Subcommand +from rattail.time import make_utc + + +log = logging.getLogger(__name__) + + +class DataSync(Subcommand): + """ + Manage the data sync daemon + """ + name = 'datasync' + description = __doc__.strip() + + def add_parser_args(self, parser): + subparsers = parser.add_subparsers(title='subcommands') + + start = subparsers.add_parser('start', help="Start service") + start.set_defaults(subcommand='start') + + stop = subparsers.add_parser('stop', help="Stop service") + stop.set_defaults(subcommand='stop') + + check = subparsers.add_parser('check', help="(DEPRECATED) Check for stale (lingering) changes in queue") + check.set_defaults(subcommand='check') + + check_queue = subparsers.add_parser('check-queue', help="Check for stale (lingering) changes in queue") + check_queue.set_defaults(subcommand='check-queue') + + check_watchers = subparsers.add_parser('check-watchers', help="Check for dead watcher threads") + check_watchers.set_defaults(subcommand='check-watchers') + + wait = subparsers.add_parser('wait', help="Wait for changes to be processed") + wait.set_defaults(subcommand='wait') + + parser.add_argument('-p', '--pidfile', metavar='PATH', default='/var/run/rattail/datasync.pid', + help="Path to PID file.") + parser.add_argument('--daemonize', action='store_true', default=True, # TODO: should default to False + help="Daemonize when starting.") + parser.add_argument('--no-daemonize', + '-D', '--do-not-daemonize', # TODO: (re)move these? + action='store_false', dest='daemonize', + help="Do not daemonize when starting.") + parser.add_argument('-T', '--timeout', metavar='MINUTES', type=int, default=0, + help="Optional timeout (in minutes) for use with the 'wait' or 'check' commands. " + "If specified for 'wait', the waiting still stop after the given number of minutes " + "and exit with a nonzero code to indicate failure. If specified for 'check', the " + "command will perform some health check based on the given timeout, and exit with " + "nonzero code if the check fails.") + + def run(self, args): + from rattail.datasync.daemon import DataSyncDaemon + + if args.subcommand == 'wait': + self.wait(args) + + elif args.subcommand == 'check': + self.check_queue(args) + + elif args.subcommand == 'check-queue': + self.check_queue(args) + + elif args.subcommand == 'check-watchers': + self.check_watchers(args) + + else: # manage the daemon + daemon = DataSyncDaemon(args.pidfile, config=self.config) + if args.subcommand == 'stop': + daemon.stop() + else: # start + try: + daemon.start(daemonize=args.daemonize) + except KeyboardInterrupt: + if not args.daemonize: + self.stderr.write("Interrupted.\n") + else: + raise + + def wait(self, args): + model = self.model + session = self.make_session() + started = make_utc() + log.debug("will wait for current change queue to clear") + last_logged = started + + changes = session.query(model.DataSyncChange) + count = changes.count() + log.debug("there are %d changes in the queue", count) + while count: + try: + now = make_utc() + + if args.timeout and (now - started).seconds >= (args.timeout * 60): + log.warning("datasync wait timed out after %d minutes, with %d changes in queue", + args.timeout, count) + sys.exit(1) + + if (now - last_logged).seconds >= 60: + log.debug("still waiting, %d changes in the datasync queue", count) + last_logged = now + + time.sleep(1) + count = changes.count() + + except KeyboardInterrupt: + self.stderr.write("Waiting cancelled by user\n") + session.close() + sys.exit(1) + + session.close() + log.debug("all datasync changes have been processed") + + def check_queue(self, args): + """ + Perform general queue / health check for datasync. + """ + model = self.model + session = self.make_session() + + # looking for changes which have been around for "timeout" minutes + timeout = args.timeout or 90 + cutoff = make_utc() - datetime.timedelta(seconds=60 * timeout) + changes = session.query(model.DataSyncChange)\ + .filter(model.DataSyncChange.obtained < cutoff)\ + .count() + session.close() + + # if we found stale changes, then "fail" - otherwise we'll "succeed" + if changes: + # TODO: should log set of unique payload types, for troubleshooting + log.debug("found %s changes, in queue for %s minutes", changes, timeout) + self.stderr.write("Found {} changes, in queue for {} minutes\n".format(changes, timeout)) + sys.exit(1) + + log.info("found no changes in queue for %s minutes", timeout) + + def check_watchers(self, args): + """ + Perform general health check for datasync watcher threads. + """ + from rattail.datasync.config import load_profiles + from rattail.datasync.util import get_lastrun + + profiles = load_profiles(self.config) + session = self.make_session() + + # cutoff is "timeout" minutes before "now" + timeout = args.timeout or 15 + cutoff = make_utc() - datetime.timedelta(seconds=60 * timeout) + + dead = [] + for key in profiles: + + # looking for watcher "last run" time older than "timeout" minutes + lastrun = get_lastrun(self.config, key, tzinfo=False, session=session) + if lastrun and lastrun < cutoff: + dead.append(key) + + session.close() + + # if we found dead watchers, then "fail" - otherwise we'll "succeed" + if dead: + self.stderr.write("Found {} watcher threads dead for {} minutes: {}\n".format(len(dead), timeout, ', '.join(dead))) + sys.exit(1) + + log.info("found no watcher threads dead for %s minutes", timeout) diff --git a/setup.py b/setup.py index 2d714ff5..b3be2bee 100644 --- a/setup.py +++ b/setup.py @@ -200,7 +200,7 @@ backup = rattail.commands.backup:Backup bouncer = rattail.commands.core:EmailBouncer checkdb = rattail.commands.core:CheckDatabase clonedb = rattail.commands.core:CloneDatabase -datasync = rattail.commands.core:DataSync +datasync = rattail.commands.datasync:DataSync date-organize = rattail.commands.core:DateOrganize execute-batch = rattail.commands.batch:ExecuteBatch export-csv = rattail.commands.importing:ExportCSV