Move datasync command logic to its own module
This commit is contained in:
parent
0e295b5a1c
commit
a0054d223a
|
@ -29,7 +29,6 @@ from __future__ import unicode_literals, absolute_import
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import json
|
import json
|
||||||
import time
|
|
||||||
import platform
|
import platform
|
||||||
import argparse
|
import argparse
|
||||||
import datetime
|
import datetime
|
||||||
|
@ -525,167 +524,6 @@ class CloneDatabase(Subcommand):
|
||||||
dst_session.close()
|
dst_session.close()
|
||||||
|
|
||||||
|
|
||||||
class DataSync(Subcommand):
|
|
||||||
"""
|
|
||||||
Manage the data sync daemon
|
|
||||||
"""
|
|
||||||
name = 'datasync'
|
|
||||||
description = __doc__.strip()
|
|
||||||
|
|
||||||
def add_parser_args(self, parser):
|
|
||||||
subparsers = parser.add_subparsers(title='subcommands')
|
|
||||||
|
|
||||||
start = subparsers.add_parser('start', help="Start service")
|
|
||||||
start.set_defaults(subcommand='start')
|
|
||||||
|
|
||||||
stop = subparsers.add_parser('stop', help="Stop service")
|
|
||||||
stop.set_defaults(subcommand='stop')
|
|
||||||
|
|
||||||
check = subparsers.add_parser('check', help="(DEPRECATED) Check for stale (lingering) changes in queue")
|
|
||||||
check.set_defaults(subcommand='check')
|
|
||||||
|
|
||||||
check_queue = subparsers.add_parser('check-queue', help="Check for stale (lingering) changes in queue")
|
|
||||||
check_queue.set_defaults(subcommand='check-queue')
|
|
||||||
|
|
||||||
check_watchers = subparsers.add_parser('check-watchers', help="Check for dead watcher threads")
|
|
||||||
check_watchers.set_defaults(subcommand='check-watchers')
|
|
||||||
|
|
||||||
wait = subparsers.add_parser('wait', help="Wait for changes to be processed")
|
|
||||||
wait.set_defaults(subcommand='wait')
|
|
||||||
|
|
||||||
parser.add_argument('-p', '--pidfile', metavar='PATH', default='/var/run/rattail/datasync.pid',
|
|
||||||
help="Path to PID file.")
|
|
||||||
parser.add_argument('--daemonize', action='store_true', default=True, # TODO: should default to False
|
|
||||||
help="Daemonize when starting.")
|
|
||||||
parser.add_argument('--no-daemonize',
|
|
||||||
'-D', '--do-not-daemonize', # TODO: (re)move these?
|
|
||||||
action='store_false', dest='daemonize',
|
|
||||||
help="Do not daemonize when starting.")
|
|
||||||
parser.add_argument('-T', '--timeout', metavar='MINUTES', type=int, default=0,
|
|
||||||
help="Optional timeout (in minutes) for use with the 'wait' or 'check' commands. "
|
|
||||||
"If specified for 'wait', the waiting still stop after the given number of minutes "
|
|
||||||
"and exit with a nonzero code to indicate failure. If specified for 'check', the "
|
|
||||||
"command will perform some health check based on the given timeout, and exit with "
|
|
||||||
"nonzero code if the check fails.")
|
|
||||||
|
|
||||||
def run(self, args):
|
|
||||||
from rattail.datasync.daemon import DataSyncDaemon
|
|
||||||
|
|
||||||
if args.subcommand == 'wait':
|
|
||||||
self.wait(args)
|
|
||||||
|
|
||||||
elif args.subcommand == 'check':
|
|
||||||
self.check_queue(args)
|
|
||||||
|
|
||||||
elif args.subcommand == 'check-queue':
|
|
||||||
self.check_queue(args)
|
|
||||||
|
|
||||||
elif args.subcommand == 'check-watchers':
|
|
||||||
self.check_watchers(args)
|
|
||||||
|
|
||||||
else: # manage the daemon
|
|
||||||
daemon = DataSyncDaemon(args.pidfile, config=self.config)
|
|
||||||
if args.subcommand == 'stop':
|
|
||||||
daemon.stop()
|
|
||||||
else: # start
|
|
||||||
try:
|
|
||||||
daemon.start(daemonize=args.daemonize)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
if not args.daemonize:
|
|
||||||
self.stderr.write("Interrupted.\n")
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
|
|
||||||
def wait(self, args):
|
|
||||||
model = self.model
|
|
||||||
session = self.make_session()
|
|
||||||
started = make_utc()
|
|
||||||
log.debug("will wait for current change queue to clear")
|
|
||||||
last_logged = started
|
|
||||||
|
|
||||||
changes = session.query(model.DataSyncChange)
|
|
||||||
count = changes.count()
|
|
||||||
log.debug("there are %d changes in the queue", count)
|
|
||||||
while count:
|
|
||||||
try:
|
|
||||||
now = make_utc()
|
|
||||||
|
|
||||||
if args.timeout and (now - started).seconds >= (args.timeout * 60):
|
|
||||||
log.warning("datasync wait timed out after %d minutes, with %d changes in queue",
|
|
||||||
args.timeout, count)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if (now - last_logged).seconds >= 60:
|
|
||||||
log.debug("still waiting, %d changes in the datasync queue", count)
|
|
||||||
last_logged = now
|
|
||||||
|
|
||||||
time.sleep(1)
|
|
||||||
count = changes.count()
|
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
self.stderr.write("Waiting cancelled by user\n")
|
|
||||||
session.close()
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
session.close()
|
|
||||||
log.debug("all datasync changes have been processed")
|
|
||||||
|
|
||||||
def check_queue(self, args):
|
|
||||||
"""
|
|
||||||
Perform general queue / health check for datasync.
|
|
||||||
"""
|
|
||||||
model = self.model
|
|
||||||
session = self.make_session()
|
|
||||||
|
|
||||||
# looking for changes which have been around for "timeout" minutes
|
|
||||||
timeout = args.timeout or 90
|
|
||||||
cutoff = make_utc() - datetime.timedelta(seconds=60 * timeout)
|
|
||||||
changes = session.query(model.DataSyncChange)\
|
|
||||||
.filter(model.DataSyncChange.obtained < cutoff)\
|
|
||||||
.count()
|
|
||||||
session.close()
|
|
||||||
|
|
||||||
# if we found stale changes, then "fail" - otherwise we'll "succeed"
|
|
||||||
if changes:
|
|
||||||
# TODO: should log set of unique payload types, for troubleshooting
|
|
||||||
log.debug("found %s changes, in queue for %s minutes", changes, timeout)
|
|
||||||
self.stderr.write("Found {} changes, in queue for {} minutes\n".format(changes, timeout))
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
log.info("found no changes in queue for %s minutes", timeout)
|
|
||||||
|
|
||||||
def check_watchers(self, args):
|
|
||||||
"""
|
|
||||||
Perform general health check for datasync watcher threads.
|
|
||||||
"""
|
|
||||||
from rattail.datasync.config import load_profiles
|
|
||||||
from rattail.datasync.util import get_lastrun
|
|
||||||
|
|
||||||
profiles = load_profiles(self.config)
|
|
||||||
session = self.make_session()
|
|
||||||
|
|
||||||
# cutoff is "timeout" minutes before "now"
|
|
||||||
timeout = args.timeout or 15
|
|
||||||
cutoff = make_utc() - datetime.timedelta(seconds=60 * timeout)
|
|
||||||
|
|
||||||
dead = []
|
|
||||||
for key in profiles:
|
|
||||||
|
|
||||||
# looking for watcher "last run" time older than "timeout" minutes
|
|
||||||
lastrun = get_lastrun(self.config, key, tzinfo=False, session=session)
|
|
||||||
if lastrun and lastrun < cutoff:
|
|
||||||
dead.append(key)
|
|
||||||
|
|
||||||
session.close()
|
|
||||||
|
|
||||||
# if we found dead watchers, then "fail" - otherwise we'll "succeed"
|
|
||||||
if dead:
|
|
||||||
self.stderr.write("Found {} watcher threads dead for {} minutes: {}\n".format(len(dead), timeout, ', '.join(dead)))
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
log.info("found no watcher threads dead for %s minutes", timeout)
|
|
||||||
|
|
||||||
|
|
||||||
class EmailBouncer(Subcommand):
|
class EmailBouncer(Subcommand):
|
||||||
"""
|
"""
|
||||||
Interacts with the email bouncer daemon. This command expects a
|
Interacts with the email bouncer daemon. This command expects a
|
||||||
|
|
200
rattail/commands/datasync.py
Normal file
200
rattail/commands/datasync.py
Normal file
|
@ -0,0 +1,200 @@
|
||||||
|
# -*- coding: utf-8; -*-
|
||||||
|
################################################################################
|
||||||
|
#
|
||||||
|
# Rattail -- Retail Software Framework
|
||||||
|
# Copyright © 2010-2021 Lance Edgar
|
||||||
|
#
|
||||||
|
# This file is part of Rattail.
|
||||||
|
#
|
||||||
|
# Rattail is free software: you can redistribute it and/or modify it under the
|
||||||
|
# terms of the GNU General Public License as published by the Free Software
|
||||||
|
# Foundation, either version 3 of the License, or (at your option) any later
|
||||||
|
# version.
|
||||||
|
#
|
||||||
|
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||||
|
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||||
|
# details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License along with
|
||||||
|
# Rattail. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
################################################################################
|
||||||
|
"""
|
||||||
|
Console Commands
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import unicode_literals, absolute_import
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import datetime
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from rattail.commands import Subcommand
|
||||||
|
from rattail.time import make_utc
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class DataSync(Subcommand):
|
||||||
|
"""
|
||||||
|
Manage the data sync daemon
|
||||||
|
"""
|
||||||
|
name = 'datasync'
|
||||||
|
description = __doc__.strip()
|
||||||
|
|
||||||
|
def add_parser_args(self, parser):
|
||||||
|
subparsers = parser.add_subparsers(title='subcommands')
|
||||||
|
|
||||||
|
start = subparsers.add_parser('start', help="Start service")
|
||||||
|
start.set_defaults(subcommand='start')
|
||||||
|
|
||||||
|
stop = subparsers.add_parser('stop', help="Stop service")
|
||||||
|
stop.set_defaults(subcommand='stop')
|
||||||
|
|
||||||
|
check = subparsers.add_parser('check', help="(DEPRECATED) Check for stale (lingering) changes in queue")
|
||||||
|
check.set_defaults(subcommand='check')
|
||||||
|
|
||||||
|
check_queue = subparsers.add_parser('check-queue', help="Check for stale (lingering) changes in queue")
|
||||||
|
check_queue.set_defaults(subcommand='check-queue')
|
||||||
|
|
||||||
|
check_watchers = subparsers.add_parser('check-watchers', help="Check for dead watcher threads")
|
||||||
|
check_watchers.set_defaults(subcommand='check-watchers')
|
||||||
|
|
||||||
|
wait = subparsers.add_parser('wait', help="Wait for changes to be processed")
|
||||||
|
wait.set_defaults(subcommand='wait')
|
||||||
|
|
||||||
|
parser.add_argument('-p', '--pidfile', metavar='PATH', default='/var/run/rattail/datasync.pid',
|
||||||
|
help="Path to PID file.")
|
||||||
|
parser.add_argument('--daemonize', action='store_true', default=True, # TODO: should default to False
|
||||||
|
help="Daemonize when starting.")
|
||||||
|
parser.add_argument('--no-daemonize',
|
||||||
|
'-D', '--do-not-daemonize', # TODO: (re)move these?
|
||||||
|
action='store_false', dest='daemonize',
|
||||||
|
help="Do not daemonize when starting.")
|
||||||
|
parser.add_argument('-T', '--timeout', metavar='MINUTES', type=int, default=0,
|
||||||
|
help="Optional timeout (in minutes) for use with the 'wait' or 'check' commands. "
|
||||||
|
"If specified for 'wait', the waiting still stop after the given number of minutes "
|
||||||
|
"and exit with a nonzero code to indicate failure. If specified for 'check', the "
|
||||||
|
"command will perform some health check based on the given timeout, and exit with "
|
||||||
|
"nonzero code if the check fails.")
|
||||||
|
|
||||||
|
def run(self, args):
|
||||||
|
from rattail.datasync.daemon import DataSyncDaemon
|
||||||
|
|
||||||
|
if args.subcommand == 'wait':
|
||||||
|
self.wait(args)
|
||||||
|
|
||||||
|
elif args.subcommand == 'check':
|
||||||
|
self.check_queue(args)
|
||||||
|
|
||||||
|
elif args.subcommand == 'check-queue':
|
||||||
|
self.check_queue(args)
|
||||||
|
|
||||||
|
elif args.subcommand == 'check-watchers':
|
||||||
|
self.check_watchers(args)
|
||||||
|
|
||||||
|
else: # manage the daemon
|
||||||
|
daemon = DataSyncDaemon(args.pidfile, config=self.config)
|
||||||
|
if args.subcommand == 'stop':
|
||||||
|
daemon.stop()
|
||||||
|
else: # start
|
||||||
|
try:
|
||||||
|
daemon.start(daemonize=args.daemonize)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
if not args.daemonize:
|
||||||
|
self.stderr.write("Interrupted.\n")
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
def wait(self, args):
|
||||||
|
model = self.model
|
||||||
|
session = self.make_session()
|
||||||
|
started = make_utc()
|
||||||
|
log.debug("will wait for current change queue to clear")
|
||||||
|
last_logged = started
|
||||||
|
|
||||||
|
changes = session.query(model.DataSyncChange)
|
||||||
|
count = changes.count()
|
||||||
|
log.debug("there are %d changes in the queue", count)
|
||||||
|
while count:
|
||||||
|
try:
|
||||||
|
now = make_utc()
|
||||||
|
|
||||||
|
if args.timeout and (now - started).seconds >= (args.timeout * 60):
|
||||||
|
log.warning("datasync wait timed out after %d minutes, with %d changes in queue",
|
||||||
|
args.timeout, count)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if (now - last_logged).seconds >= 60:
|
||||||
|
log.debug("still waiting, %d changes in the datasync queue", count)
|
||||||
|
last_logged = now
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
count = changes.count()
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
self.stderr.write("Waiting cancelled by user\n")
|
||||||
|
session.close()
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
session.close()
|
||||||
|
log.debug("all datasync changes have been processed")
|
||||||
|
|
||||||
|
def check_queue(self, args):
|
||||||
|
"""
|
||||||
|
Perform general queue / health check for datasync.
|
||||||
|
"""
|
||||||
|
model = self.model
|
||||||
|
session = self.make_session()
|
||||||
|
|
||||||
|
# looking for changes which have been around for "timeout" minutes
|
||||||
|
timeout = args.timeout or 90
|
||||||
|
cutoff = make_utc() - datetime.timedelta(seconds=60 * timeout)
|
||||||
|
changes = session.query(model.DataSyncChange)\
|
||||||
|
.filter(model.DataSyncChange.obtained < cutoff)\
|
||||||
|
.count()
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
# if we found stale changes, then "fail" - otherwise we'll "succeed"
|
||||||
|
if changes:
|
||||||
|
# TODO: should log set of unique payload types, for troubleshooting
|
||||||
|
log.debug("found %s changes, in queue for %s minutes", changes, timeout)
|
||||||
|
self.stderr.write("Found {} changes, in queue for {} minutes\n".format(changes, timeout))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
log.info("found no changes in queue for %s minutes", timeout)
|
||||||
|
|
||||||
|
def check_watchers(self, args):
|
||||||
|
"""
|
||||||
|
Perform general health check for datasync watcher threads.
|
||||||
|
"""
|
||||||
|
from rattail.datasync.config import load_profiles
|
||||||
|
from rattail.datasync.util import get_lastrun
|
||||||
|
|
||||||
|
profiles = load_profiles(self.config)
|
||||||
|
session = self.make_session()
|
||||||
|
|
||||||
|
# cutoff is "timeout" minutes before "now"
|
||||||
|
timeout = args.timeout or 15
|
||||||
|
cutoff = make_utc() - datetime.timedelta(seconds=60 * timeout)
|
||||||
|
|
||||||
|
dead = []
|
||||||
|
for key in profiles:
|
||||||
|
|
||||||
|
# looking for watcher "last run" time older than "timeout" minutes
|
||||||
|
lastrun = get_lastrun(self.config, key, tzinfo=False, session=session)
|
||||||
|
if lastrun and lastrun < cutoff:
|
||||||
|
dead.append(key)
|
||||||
|
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
# if we found dead watchers, then "fail" - otherwise we'll "succeed"
|
||||||
|
if dead:
|
||||||
|
self.stderr.write("Found {} watcher threads dead for {} minutes: {}\n".format(len(dead), timeout, ', '.join(dead)))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
log.info("found no watcher threads dead for %s minutes", timeout)
|
2
setup.py
2
setup.py
|
@ -200,7 +200,7 @@ backup = rattail.commands.backup:Backup
|
||||||
bouncer = rattail.commands.core:EmailBouncer
|
bouncer = rattail.commands.core:EmailBouncer
|
||||||
checkdb = rattail.commands.core:CheckDatabase
|
checkdb = rattail.commands.core:CheckDatabase
|
||||||
clonedb = rattail.commands.core:CloneDatabase
|
clonedb = rattail.commands.core:CloneDatabase
|
||||||
datasync = rattail.commands.core:DataSync
|
datasync = rattail.commands.datasync:DataSync
|
||||||
date-organize = rattail.commands.core:DateOrganize
|
date-organize = rattail.commands.core:DateOrganize
|
||||||
execute-batch = rattail.commands.batch:ExecuteBatch
|
execute-batch = rattail.commands.batch:ExecuteBatch
|
||||||
export-csv = rattail.commands.importing:ExportCSV
|
export-csv = rattail.commands.importing:ExportCSV
|
||||||
|
|
Loading…
Reference in a new issue