Add basic support for backfill Luigi tasks

idea being, sometimes you must import many days worth of data into
Trainwreck or what-not, and it must be split up b/c e.g. it would take
too long to import all at once (i.e. might interfere with overnight
tasks)
This commit is contained in:
Lance Edgar 2022-08-23 23:27:47 -05:00
parent 488696cb39
commit 78500770d9
3 changed files with 688 additions and 137 deletions

View file

@ -27,19 +27,29 @@ Views for Luigi
from __future__ import unicode_literals, absolute_import
import json
import logging
import os
import re
import shlex
import six
import sqlalchemy as sa
from rattail.util import simple_error
from tailbone.views import MasterView
class LuigiJobView(MasterView):
log = logging.getLogger(__name__)
class LuigiTaskView(MasterView):
"""
Simple views for Luigi jobs.
Simple views for Luigi tasks.
"""
normalized_model_name = 'luigijobs'
model_key = 'jobname'
model_title = "Luigi Job"
normalized_model_name = 'luigitasks'
model_key = 'key'
model_title = "Luigi Task"
route_prefix = 'luigi'
url_prefix = '/luigi'
@ -50,27 +60,57 @@ class LuigiJobView(MasterView):
configurable = True
def __init__(self, request, context=None):
super(LuigiJobView, self).__init__(request, context=context)
super(LuigiTaskView, self).__init__(request, context=context)
app = self.get_rattail_app()
self.luigi_handler = app.get_luigi_handler()
def index(self):
luigi_url = self.rattail_config.get('luigi', 'url')
luigi_url = self.rattail_config.get('rattail.luigi', 'url')
history_url = '{}/history'.format(luigi_url.rstrip('/')) if luigi_url else None
return self.render_to_response('index', {
'use_buefy': self.get_use_buefy(),
'index_url': None,
'luigi_url': luigi_url,
'luigi_history_url': history_url,
'overnight_tasks': self.luigi_handler.get_all_overnight_tasks(),
'overnight_tasks': self.get_overnight_tasks(),
'backfill_tasks': self.get_backfill_tasks(),
})
def launch(self):
key = self.request.POST['job']
assert key
self.luigi_handler.restart_overnight_task(key)
self.request.session.flash("Scheduled overnight task for immediate launch: {}".format(key))
return self.redirect(self.get_index_url())
def launch_overnight(self):
app = self.get_rattail_app()
data = self.request.json_body
key = data.get('key')
task = self.luigi_handler.get_overnight_task(key) if key else None
if not task:
return self.json_response({'error': "Task not found"})
try:
self.luigi_handler.launch_overnight_task(task, app.yesterday())
except Exception as error:
log.warning("failed to launch overnight task: %s", task,
exc_info=True)
return self.json_response({'error': simple_error(error)})
return self.json_response({'ok': True})
def launch_backfill(self):
app = self.get_rattail_app()
data = self.request.json_body
key = data.get('key')
task = self.luigi_handler.get_backfill_task(key) if key else None
if not task:
return self.json_response({'error': "Task not found"})
start_date = app.parse_date(data['start_date'])
end_date = app.parse_date(data['end_date'])
try:
self.luigi_handler.launch_backfill_task(task, start_date, end_date)
except Exception as error:
log.warning("failed to launch backfill task: %s", task,
exc_info=True)
return self.json_response({'error': simple_error(error)})
return self.json_response({'ok': True})
def restart_scheduler(self):
try:
@ -87,36 +127,120 @@ class LuigiJobView(MasterView):
return [
# luigi proper
{'section': 'luigi',
{'section': 'rattail.luigi',
'option': 'url'},
{'section': 'luigi',
{'section': 'rattail.luigi',
'option': 'scheduler.supervisor_process_name'},
{'section': 'luigi',
{'section': 'rattail.luigi',
'option': 'scheduler.restart_command'},
]
def configure_get_context(self, **kwargs):
context = super(LuigiJobView, self).configure_get_context(**kwargs)
context['overnight_tasks'] = self.luigi_handler.get_all_overnight_tasks()
context = super(LuigiTaskView, self).configure_get_context(**kwargs)
context['overnight_tasks'] = self.get_overnight_tasks()
context['backfill_tasks'] = self.get_backfill_tasks()
return context
def configure_gather_settings(self, data):
settings = super(LuigiJobView, self).configure_gather_settings(data)
def get_overnight_tasks(self):
tasks = self.luigi_handler.get_all_overnight_tasks()
for task in tasks:
if task['last_date']:
task['last_date'] = six.text_type(task['last_date'])
return tasks
def get_backfill_tasks(self):
tasks = self.luigi_handler.get_all_backfill_tasks()
for task in tasks:
if task['last_date']:
task['last_date'] = six.text_type(task['last_date'])
if task['target_date']:
task['target_date'] = six.text_type(task['target_date'])
return tasks
def configure_gather_settings(self, data):
settings = super(LuigiTaskView, self).configure_gather_settings(data)
app = self.get_rattail_app()
# overnight tasks
keys = []
for task in json.loads(data['overnight_tasks']):
keys.append(task['key'])
key = task['key']
if key.startswith('_new_'):
key = app.make_uuid()
key = task['key']
if key.startswith('_new_'):
cmd = shlex.split(task['script'])
script = os.path.basename(cmd[0])
root, ext = os.path.splitext(script)
key = re.sub(r'\s+', '-', root)
keys.append(key)
settings.extend([
{'name': 'rattail.luigi.overnight.{}.description'.format(key),
'value': task['description']},
{'name': 'rattail.luigi.overnight.{}.script'.format(key),
'value': task['script']},
{'name': 'rattail.luigi.overnight.{}.notes'.format(key),
'value': task['notes']},
])
if keys:
settings.append({'name': 'luigi.overnight_tasks',
settings.append({'name': 'rattail.luigi.overnight_tasks',
'value': ', '.join(keys)})
# backfill tasks
keys = []
for task in json.loads(data['backfill_tasks']):
key = task['key']
if key.startswith('_new_'):
script = os.path.basename(task['script'])
root, ext = os.path.splitext(script)
key = re.sub(r'\s+', '-', root)
keys.append(key)
settings.extend([
{'name': 'rattail.luigi.backfill.{}.description'.format(key),
'value': task['description']},
{'name': 'rattail.luigi.backfill.{}.script'.format(key),
'value': task['script']},
{'name': 'rattail.luigi.backfill.{}.forward'.format(key),
'value': 'true' if task['forward'] else 'false'},
{'name': 'rattail.luigi.backfill.{}.notes'.format(key),
'value': task['notes']},
{'name': 'rattail.luigi.backfill.{}.target_date'.format(key),
'value': six.text_type(task['target_date'])},
])
if keys:
settings.append({'name': 'rattail.luigi.backfill_tasks',
'value': ', '.join(keys)})
return settings
def configure_remove_settings(self):
super(LuigiJobView, self).configure_remove_settings()
self.luigi_handler.purge_luigi_settings(self.Session())
super(LuigiTaskView, self).configure_remove_settings()
app = self.get_rattail_app()
model = self.model
session = self.Session()
to_delete = session.query(model.Setting)\
.filter(sa.or_(
model.Setting.name == 'rattail.luigi.backfill_tasks',
model.Setting.name.like('rattail.luigi.backfill.%.description'),
model.Setting.name.like('rattail.luigi.backfill.%.forward'),
model.Setting.name.like('rattail.luigi.backfill.%.notes'),
model.Setting.name.like('rattail.luigi.backfill.%.script'),
model.Setting.name.like('rattail.luigi.backfill.%.target_date'),
model.Setting.name == 'rattail.luigi.overnight_tasks',
model.Setting.name.like('rattail.luigi.overnight.%.description'),
model.Setting.name.like('rattail.luigi.overnight.%.notes'),
model.Setting.name.like('rattail.luigi.overnight.%.script')))\
.all()
for setting in to_delete:
app.delete_setting(session, setting.name)
@classmethod
def defaults(cls, config):
@ -130,16 +254,27 @@ class LuigiJobView(MasterView):
url_prefix = cls.get_url_prefix()
model_title_plural = cls.get_model_title_plural()
# launch job
# launch overnight
config.add_tailbone_permission(permission_prefix,
'{}.launch'.format(permission_prefix),
label="Launch any Luigi job")
config.add_route('{}.launch'.format(route_prefix),
'{}/launch'.format(url_prefix),
'{}.launch_overnight'.format(permission_prefix),
label="Launch any Overnight Task")
config.add_route('{}.launch_overnight'.format(route_prefix),
'{}/launch-overnight'.format(url_prefix),
request_method='POST')
config.add_view(cls, attr='launch',
route_name='{}.launch'.format(route_prefix),
permission='{}.launch'.format(permission_prefix))
config.add_view(cls, attr='launch_overnight',
route_name='{}.launch_overnight'.format(route_prefix),
permission='{}.launch_overnight'.format(permission_prefix))
# launch backfill
config.add_tailbone_permission(permission_prefix,
'{}.launch_backfill'.format(permission_prefix),
label="Launch any Backfill Task")
config.add_route('{}.launch_backfill'.format(route_prefix),
'{}/launch-backfill'.format(url_prefix),
request_method='POST')
config.add_view(cls, attr='launch_backfill',
route_name='{}.launch_backfill'.format(route_prefix),
permission='{}.launch_backfill'.format(permission_prefix))
# restart luigid scheduler
config.add_tailbone_permission(permission_prefix,
@ -156,8 +291,8 @@ class LuigiJobView(MasterView):
def defaults(config, **kwargs):
base = globals()
LuigiJobView = kwargs.get('LuigiJobView', base['LuigiJobView'])
LuigiJobView.defaults(config)
LuigiTaskView = kwargs.get('LuigiTaskView', base['LuigiTaskView'])
LuigiTaskView.defaults(config)
def includeme(config):