Overhaul logic for launching backfill task

borrowing good patterns from overnight task.  trying to standardize so
existing luigi config is used etc.
This commit is contained in:
Lance Edgar 2023-01-14 02:19:02 -06:00
parent 08956af106
commit 9c0bd8662f

View file

@ -2,7 +2,7 @@
################################################################################ ################################################################################
# #
# Rattail -- Retail Software Framework # Rattail -- Retail Software Framework
# Copyright © 2010-2022 Lance Edgar # Copyright © 2010-2023 Lance Edgar
# #
# This file is part of Rattail. # This file is part of Rattail.
# #
@ -350,57 +350,126 @@ class LuigiHandler(GenericHandler):
if task['key'] == key: if task['key'] == key:
return task return task
def launch_backfill_task(self, task, start_date, end_date, **kwargs): def launch_backfill_task(self, task, start_date, end_date,
keep_config=True,
email_if_empty=True,
email_key=None,
wait=True,
dry_run=False,
**kwargs):
"""
Launch the given backfill task, to run for the given date
range.
:param task: A backfill task info dict, e.g. as obtained from
:meth:`get_backfill_task()`.
:param start_date: Start of date range for which task should
run.
:param end_date: End of date range for which task should run.
This is *inclusive* so the task will be ran for this
``end_date`` value, at the end.
:param keep_config: If true (the default) then the subcommand
will be invoked with the same config file(s) which are
effective in the current/parent process. If false, it will
be invoked only with ``app/silent.conf``.
:param email_if_empty: If true (the default), then email will
be sent when the task command completes, even if it
produces no output. If false, then email is sent only if
the command produces output.
:param email_key: Optional config key for email settings to be
used in determining recipients etc.
:param wait: If true (the default), the task will run
in-process, and so will begin immediately, but caller must
wait for it to complete. If false, the task will be
scheduled via the ``at`` command, to begin within the next
minute. (This lets process control return immediately to
the caller.)
:param dry_run: If true, log the final command for the task
but do not actually run it.
"""
if not start_date or not end_date: if not start_date or not end_date:
raise ValueError("must specify both start_date and end_date") raise ValueError("must specify both start_date and end_date")
appdir = self.config.appdir()
luigi = os.path.join(sys.prefix, 'bin', 'luigi')
logging_conf = os.path.join(appdir, 'luigi', 'logging.conf')
cmd = [luigi, '--logging-conf-file', logging_conf,
'--module', 'rattail.luigi.backfill_runner']
if task['forward']:
cmd.append('ForwardBackfillRange')
else:
cmd.append('BackwardBackfillRange')
if start_date > end_date: if start_date > end_date:
start_date, end_date = end_date, start_date start_date, end_date = end_date, start_date
cmd.extend([ appdir = self.config.appdir()
'--key', task['key'],
'--start-date={}'.format(start_date),
'--end-date={}'.format(end_date),
])
env = { env = {
'RATTAIL_CONFIG_FILES': os.path.join(appdir, 'silent.conf'), # TODO: is this ever needed here?
# 'PYTHONPATH': appdir,
} }
if keep_config:
env['RATTAIL_CONFIG_FILES'] = os.pathsep.join(self.config.files_read)
else:
env['RATTAIL_CONFIG_FILES'] = os.path.join(appdir, 'silent.conf')
# build our command, which will vary per caller request. by
# default we do not use the shell to run command, but in some
# cases must (e.g. when invoking `at`)
shell = False
# luigi
luigi = os.path.join(sys.prefix, 'bin', 'luigi')
cmd = [luigi, '--module=rattail.luigi.backfill_runner',
'{}BackfillRange'.format(
'Forward' if task['forward'] else 'Backward'),
'--key', task['key'],
'--start-date={}'.format(start_date),
'--end-date={}'.format(end_date)]
# rattail run-n-mail luigi
last_date = end_date if task['forward'] else start_date last_date = end_date if task['forward'] else start_date
cmd = [os.path.join(sys.prefix, 'bin', 'rattail'), cmd = [os.path.join(sys.prefix, 'bin', 'rattail'),
'--config', os.path.join(appdir, 'silent.conf'),
'--no-versioning', '--no-versioning',
'run-n-mail', 'run-n-mail',
'-S', 'Backfill thru {}: {}'.format(last_date, task['description']), '--keep-exit-code',
'--subject', 'Backfill thru {}: {}'.format(last_date, task['description']),
shlex_join(cmd)] shlex_join(cmd)]
if email_key:
cmd.extend(['--key', email_key])
if not email_if_empty:
cmd.append('--skip-if-empty')
cmd = ['echo', shlex_join(cmd)] # not waiting means schedule via `at`
cmd = shlex_join(cmd) if not wait:
cmd = "{} | at 'now + 1 minute'".format(cmd) # echo 'rattail run-n-mail luigi' | at now
cmd = ['echo', shlex_join(cmd)]
cmd = shlex_join(cmd)
cmd = "{} | at 'now + 1 minute'".format(cmd)
shell = True # must run command via shell
# log final command
log.debug("env is: %s", env)
log.debug("launching command in subprocess: %s", cmd)
if dry_run:
log.debug("dry-run mode, so aborting")
return
# run command in subprocess # run command in subprocess
log.debug("launching command in subprocess: %s", cmd) curdir = os.getcwd()
try: try:
subprocess.check_output(cmd, shell=True, env=env, # nb. always chdir to luigi folder, even if not needed
os.chdir(os.path.join(appdir, 'luigi'))
subprocess.check_output(cmd, shell=shell, env=env,
stderr=subprocess.PIPE) stderr=subprocess.PIPE)
except subprocess.CalledProcessError as error: except subprocess.CalledProcessError as error:
log.warning("command failed with exit code %s! output was:", log.warning("command failed with exit code %s! output was:",
error.returncode) error.returncode)
log.warning(error.stderr.decode('utf_8')) log.warning(error.output.decode('utf_8'))
raise raise
finally:
# nb. always change back to first dir, in case we're being
# called by some long-running process, e.g. web app
os.chdir(curdir)
def record_backfill_last_date(self, task, date, session=None, **kwargs): def record_backfill_last_date(self, task, date, session=None, **kwargs):
name = 'rattail.luigi.backfill.task.{}.last_date'.format(task['key']) name = 'rattail.luigi.backfill.task.{}.last_date'.format(task['key'])