From 78500770d9e1c3089785f9925f7d759986d7774d Mon Sep 17 00:00:00 2001 From: Lance Edgar Date: Tue, 23 Aug 2022 23:27:47 -0500 Subject: [PATCH] Add basic support for backfill Luigi tasks idea being, sometimes you must import many days worth of data into Trainwreck or what-not, and it must be split up b/c e.g. it would take too long to import all at once (i.e. might interfere with overnight tasks) --- tailbone/templates/luigi/configure.mako | 341 ++++++++++++++++++++---- tailbone/templates/luigi/index.mako | 279 +++++++++++++++---- tailbone/views/luigi.py | 205 +++++++++++--- 3 files changed, 688 insertions(+), 137 deletions(-) diff --git a/tailbone/templates/luigi/configure.mako b/tailbone/templates/luigi/configure.mako index b8fba490..cf590adb 100644 --- a/tailbone/templates/luigi/configure.mako +++ b/tailbone/templates/luigi/configure.mako @@ -3,61 +3,213 @@ <%def name="form_content()"> ${h.hidden('overnight_tasks', **{':value': 'JSON.stringify(overnightTasks)'})} + ${h.hidden('backfill_tasks', **{':value': 'JSON.stringify(backfillTasks)'})} -

Overnight Tasks

+
+
+
+

Overnight Tasks

+
+
+ + New Task + +
+
+
-
- - New Task - + + - +
+
+
+
+

Backfill Tasks

+
+
+ + New Task + +
+
+ + + + + + + + + +

Luigi Proper

@@ -65,8 +217,8 @@ - @@ -74,8 +226,8 @@ - @@ -83,8 +235,8 @@ - @@ -100,28 +252,113 @@ ThisPageData.overnightTasks = ${json.dumps(overnight_tasks)|n} ThisPageData.overnightTaskShowDialog = false ThisPageData.overnightTask = null + ThisPageData.overnightTaskCounter = 0 ThisPageData.overnightTaskKey = null + ThisPageData.overnightTaskDescription = null + ThisPageData.overnightTaskScript = null + ThisPageData.overnightTaskNotes = null ThisPage.methods.overnightTaskCreate = function() { - this.overnightTask = null + this.overnightTask = {key: null} this.overnightTaskKey = null + this.overnightTaskDescription = null + this.overnightTaskScript = null + this.overnightTaskNotes = null this.overnightTaskShowDialog = true this.$nextTick(() => { - this.$refs.overnightTaskKey.focus() + this.$refs.overnightTaskDescription.focus() }) } + ThisPage.methods.overnightTaskEdit = function(task) { + this.overnightTask = task + this.overnightTaskKey = task.key + this.overnightTaskDescription = task.description + this.overnightTaskScript = task.script + this.overnightTaskNotes = task.notes + this.overnightTaskShowDialog = true + } + ThisPage.methods.overnightTaskSave = function() { - if (this.overnightTask) { - this.overnightTask.key = this.overnightTaskKey - } else { - let task = {key: this.overnightTaskKey} - this.overnightTasks.push(task) + this.overnightTask.description = this.overnightTaskDescription + this.overnightTask.script = this.overnightTaskScript + this.overnightTask.notes = this.overnightTaskNotes + + if (!this.overnightTask.key) { + this.overnightTask.key = `_new_${'$'}{++this.overnightTaskCounter}` + this.overnightTasks.push(this.overnightTask) } + this.overnightTaskShowDialog = false this.settingsNeedSaved = true } + ThisPage.methods.overnightTaskDelete = function(task) { + if (confirm("Really delete this task?")) { + let i = this.overnightTasks.indexOf(task) + this.overnightTasks.splice(i, 1) + this.settingsNeedSaved = true + } + } + + ThisPageData.backfillTasks = ${json.dumps(backfill_tasks)|n} + ThisPageData.backfillTaskShowDialog = false + ThisPageData.backfillTask = null + ThisPageData.backfillTaskCounter = 0 + ThisPageData.backfillTaskKey = null + ThisPageData.backfillTaskDescription = null + ThisPageData.backfillTaskScript = null + ThisPageData.backfillTaskForward = false + ThisPageData.backfillTaskTargetDate = null + ThisPageData.backfillTaskNotes = null + + ThisPage.methods.backfillTaskCreate = function() { + this.backfillTask = {key: null} + this.backfillTaskDescription = null + this.backfillTaskScript = null + this.backfillTaskForward = false + this.backfillTaskTargetDate = null + this.backfillTaskNotes = null + this.backfillTaskShowDialog = true + this.$nextTick(() => { + this.$refs.backfillTaskDescription.focus() + }) + } + + ThisPage.methods.backfillTaskEdit = function(task) { + this.backfillTask = task + this.backfillTaskDescription = task.description + this.backfillTaskScript = task.script + this.backfillTaskForward = task.forward + this.backfillTaskTargetDate = task.target_date + this.backfillTaskNotes = task.notes + this.backfillTaskShowDialog = true + } + + ThisPage.methods.backfillTaskDelete = function(task) { + if (confirm("Really delete this task?")) { + let i = this.backfillTasks.indexOf(task) + this.backfillTasks.splice(i, 1) + this.settingsNeedSaved = true + } + } + + ThisPage.methods.backfillTaskSave = function() { + this.backfillTask.description = this.backfillTaskDescription + this.backfillTask.script = this.backfillTaskScript + this.backfillTask.forward = this.backfillTaskForward + this.backfillTask.target_date = this.backfillTaskTargetDate + this.backfillTask.notes = this.backfillTaskNotes + + if (!this.backfillTask.key) { + this.backfillTask.key = `_new_${'$'}{++this.backfillTaskCounter}` + this.backfillTasks.push(this.backfillTask) + } + + this.backfillTaskShowDialog = false + this.settingsNeedSaved = true + } + diff --git a/tailbone/templates/luigi/index.mako b/tailbone/templates/luigi/index.mako index 16ea3489..c4407ff1 100644 --- a/tailbone/templates/luigi/index.mako +++ b/tailbone/templates/luigi/index.mako @@ -1,7 +1,7 @@ ## -*- coding: utf-8; -*- <%inherit file="/page.mako" /> -<%def name="title()">Luigi Jobs +<%def name="title()">View / Launch Tasks <%def name="page_content()">
@@ -49,13 +49,141 @@ % endif
- % if master.has_perm('launch'): + % if master.has_perm('launch_overnight'): +

Overnight Tasks

- % for task in overnight_tasks: - - - % endfor + + + + + + + % endif + + % if master.has_perm('launch_backfill'): + +

Backfill Tasks

+ + + + + + + + + + % endif
@@ -63,8 +191,9 @@ <%def name="modify_this_page_vars()"> ${parent.modify_this_page_vars()} - % if master.has_perm('restart_scheduler'): - - % endif - + % endif -<%def name="finalize_this_page_vars()"> - ${parent.finalize_this_page_vars()} - % if master.has_perm('launch'): - - % endif - + let url = '${url('{}.launch_overnight'.format(route_prefix))}' + let params = {key: task.key} -<%def name="render_this_page_template()"> - ${parent.render_this_page_template()} - % if master.has_perm('launch'): - - % endif + this.submitForm(url, params, response => { + this.$buefy.toast.open({ + message: "Task has been scheduled for immediate launch!", + type: 'is-success', + duration: 5000, // 5 seconds + }) + this.overnightTaskLaunching = false + }) + } + + % endif + + % if master.has_perm('launch_backfill'): + + ThisPageData.backfillTasks = ${json.dumps(backfill_tasks)|n} + ThisPageData.backfillTask = null + ThisPageData.backfillTaskStartDate = null + ThisPageData.backfillTaskEndDate = null + ThisPageData.backfillTaskShowLaunchDialog = false + ThisPageData.backfillTaskLaunching = false + + ThisPage.methods.backfillTextClass = function(task) { + if (task.target_date) { + if (task.last_date) { + if (task.forward) { + if (task.last_date >= task.target_date) { + return 'has-text-success' + } else { + return 'has-text-warning' + } + } else { + if (task.last_date <= task.target_date) { + return 'has-text-success' + } else { + return 'has-text-warning' + } + } + } + } + } + + ThisPage.methods.backfillTaskLaunch = function(task) { + this.backfillTask = task + this.backfillTaskStartDate = null + this.backfillTaskEndDate = null + this.backfillTaskShowLaunchDialog = true + } + + ThisPage.methods.backfillTaskLaunchSubmit = function() { + this.backfillTaskLaunching = true + + let url = '${url('{}.launch_backfill'.format(route_prefix))}' + let params = { + key: this.backfillTask.key, + start_date: this.backfillTaskStartDate, + end_date: this.backfillTaskEndDate, + } + + this.submitForm(url, params, response => { + this.$buefy.toast.open({ + message: "Task has been scheduled for immediate launch!", + type: 'is-success', + duration: 5000, // 5 seconds + }) + this.backfillTaskLaunching = false + this.backfillTaskShowLaunchDialog = false + }) + } + + % endif + + diff --git a/tailbone/views/luigi.py b/tailbone/views/luigi.py index 6b0b60e3..dfc68d2f 100644 --- a/tailbone/views/luigi.py +++ b/tailbone/views/luigi.py @@ -27,19 +27,29 @@ Views for Luigi from __future__ import unicode_literals, absolute_import import json +import logging +import os +import re +import shlex + +import six +import sqlalchemy as sa from rattail.util import simple_error from tailbone.views import MasterView -class LuigiJobView(MasterView): +log = logging.getLogger(__name__) + + +class LuigiTaskView(MasterView): """ - Simple views for Luigi jobs. + Simple views for Luigi tasks. """ - normalized_model_name = 'luigijobs' - model_key = 'jobname' - model_title = "Luigi Job" + normalized_model_name = 'luigitasks' + model_key = 'key' + model_title = "Luigi Task" route_prefix = 'luigi' url_prefix = '/luigi' @@ -50,27 +60,57 @@ class LuigiJobView(MasterView): configurable = True def __init__(self, request, context=None): - super(LuigiJobView, self).__init__(request, context=context) + super(LuigiTaskView, self).__init__(request, context=context) app = self.get_rattail_app() self.luigi_handler = app.get_luigi_handler() def index(self): - luigi_url = self.rattail_config.get('luigi', 'url') + luigi_url = self.rattail_config.get('rattail.luigi', 'url') history_url = '{}/history'.format(luigi_url.rstrip('/')) if luigi_url else None return self.render_to_response('index', { 'use_buefy': self.get_use_buefy(), 'index_url': None, 'luigi_url': luigi_url, 'luigi_history_url': history_url, - 'overnight_tasks': self.luigi_handler.get_all_overnight_tasks(), + 'overnight_tasks': self.get_overnight_tasks(), + 'backfill_tasks': self.get_backfill_tasks(), }) - def launch(self): - key = self.request.POST['job'] - assert key - self.luigi_handler.restart_overnight_task(key) - self.request.session.flash("Scheduled overnight task for immediate launch: {}".format(key)) - return self.redirect(self.get_index_url()) + def launch_overnight(self): + app = self.get_rattail_app() + data = self.request.json_body + + key = data.get('key') + task = self.luigi_handler.get_overnight_task(key) if key else None + if not task: + return self.json_response({'error': "Task not found"}) + + try: + self.luigi_handler.launch_overnight_task(task, app.yesterday()) + except Exception as error: + log.warning("failed to launch overnight task: %s", task, + exc_info=True) + return self.json_response({'error': simple_error(error)}) + return self.json_response({'ok': True}) + + def launch_backfill(self): + app = self.get_rattail_app() + data = self.request.json_body + + key = data.get('key') + task = self.luigi_handler.get_backfill_task(key) if key else None + if not task: + return self.json_response({'error': "Task not found"}) + + start_date = app.parse_date(data['start_date']) + end_date = app.parse_date(data['end_date']) + try: + self.luigi_handler.launch_backfill_task(task, start_date, end_date) + except Exception as error: + log.warning("failed to launch backfill task: %s", task, + exc_info=True) + return self.json_response({'error': simple_error(error)}) + return self.json_response({'ok': True}) def restart_scheduler(self): try: @@ -87,36 +127,120 @@ class LuigiJobView(MasterView): return [ # luigi proper - {'section': 'luigi', + {'section': 'rattail.luigi', 'option': 'url'}, - {'section': 'luigi', + {'section': 'rattail.luigi', 'option': 'scheduler.supervisor_process_name'}, - {'section': 'luigi', + {'section': 'rattail.luigi', 'option': 'scheduler.restart_command'}, ] def configure_get_context(self, **kwargs): - context = super(LuigiJobView, self).configure_get_context(**kwargs) - context['overnight_tasks'] = self.luigi_handler.get_all_overnight_tasks() + context = super(LuigiTaskView, self).configure_get_context(**kwargs) + context['overnight_tasks'] = self.get_overnight_tasks() + context['backfill_tasks'] = self.get_backfill_tasks() return context - def configure_gather_settings(self, data): - settings = super(LuigiJobView, self).configure_gather_settings(data) + def get_overnight_tasks(self): + tasks = self.luigi_handler.get_all_overnight_tasks() + for task in tasks: + if task['last_date']: + task['last_date'] = six.text_type(task['last_date']) + return tasks + def get_backfill_tasks(self): + tasks = self.luigi_handler.get_all_backfill_tasks() + for task in tasks: + if task['last_date']: + task['last_date'] = six.text_type(task['last_date']) + if task['target_date']: + task['target_date'] = six.text_type(task['target_date']) + return tasks + + def configure_gather_settings(self, data): + settings = super(LuigiTaskView, self).configure_gather_settings(data) + app = self.get_rattail_app() + + # overnight tasks keys = [] for task in json.loads(data['overnight_tasks']): - keys.append(task['key']) + key = task['key'] + if key.startswith('_new_'): + key = app.make_uuid() + + key = task['key'] + if key.startswith('_new_'): + cmd = shlex.split(task['script']) + script = os.path.basename(cmd[0]) + root, ext = os.path.splitext(script) + key = re.sub(r'\s+', '-', root) + + keys.append(key) + settings.extend([ + {'name': 'rattail.luigi.overnight.{}.description'.format(key), + 'value': task['description']}, + {'name': 'rattail.luigi.overnight.{}.script'.format(key), + 'value': task['script']}, + {'name': 'rattail.luigi.overnight.{}.notes'.format(key), + 'value': task['notes']}, + ]) if keys: - settings.append({'name': 'luigi.overnight_tasks', + settings.append({'name': 'rattail.luigi.overnight_tasks', + 'value': ', '.join(keys)}) + + # backfill tasks + keys = [] + for task in json.loads(data['backfill_tasks']): + + key = task['key'] + if key.startswith('_new_'): + script = os.path.basename(task['script']) + root, ext = os.path.splitext(script) + key = re.sub(r'\s+', '-', root) + + keys.append(key) + settings.extend([ + {'name': 'rattail.luigi.backfill.{}.description'.format(key), + 'value': task['description']}, + {'name': 'rattail.luigi.backfill.{}.script'.format(key), + 'value': task['script']}, + {'name': 'rattail.luigi.backfill.{}.forward'.format(key), + 'value': 'true' if task['forward'] else 'false'}, + {'name': 'rattail.luigi.backfill.{}.notes'.format(key), + 'value': task['notes']}, + {'name': 'rattail.luigi.backfill.{}.target_date'.format(key), + 'value': six.text_type(task['target_date'])}, + ]) + if keys: + settings.append({'name': 'rattail.luigi.backfill_tasks', 'value': ', '.join(keys)}) return settings def configure_remove_settings(self): - super(LuigiJobView, self).configure_remove_settings() - self.luigi_handler.purge_luigi_settings(self.Session()) + super(LuigiTaskView, self).configure_remove_settings() + app = self.get_rattail_app() + model = self.model + session = self.Session() + + to_delete = session.query(model.Setting)\ + .filter(sa.or_( + model.Setting.name == 'rattail.luigi.backfill_tasks', + model.Setting.name.like('rattail.luigi.backfill.%.description'), + model.Setting.name.like('rattail.luigi.backfill.%.forward'), + model.Setting.name.like('rattail.luigi.backfill.%.notes'), + model.Setting.name.like('rattail.luigi.backfill.%.script'), + model.Setting.name.like('rattail.luigi.backfill.%.target_date'), + model.Setting.name == 'rattail.luigi.overnight_tasks', + model.Setting.name.like('rattail.luigi.overnight.%.description'), + model.Setting.name.like('rattail.luigi.overnight.%.notes'), + model.Setting.name.like('rattail.luigi.overnight.%.script')))\ + .all() + + for setting in to_delete: + app.delete_setting(session, setting.name) @classmethod def defaults(cls, config): @@ -130,16 +254,27 @@ class LuigiJobView(MasterView): url_prefix = cls.get_url_prefix() model_title_plural = cls.get_model_title_plural() - # launch job + # launch overnight config.add_tailbone_permission(permission_prefix, - '{}.launch'.format(permission_prefix), - label="Launch any Luigi job") - config.add_route('{}.launch'.format(route_prefix), - '{}/launch'.format(url_prefix), + '{}.launch_overnight'.format(permission_prefix), + label="Launch any Overnight Task") + config.add_route('{}.launch_overnight'.format(route_prefix), + '{}/launch-overnight'.format(url_prefix), request_method='POST') - config.add_view(cls, attr='launch', - route_name='{}.launch'.format(route_prefix), - permission='{}.launch'.format(permission_prefix)) + config.add_view(cls, attr='launch_overnight', + route_name='{}.launch_overnight'.format(route_prefix), + permission='{}.launch_overnight'.format(permission_prefix)) + + # launch backfill + config.add_tailbone_permission(permission_prefix, + '{}.launch_backfill'.format(permission_prefix), + label="Launch any Backfill Task") + config.add_route('{}.launch_backfill'.format(route_prefix), + '{}/launch-backfill'.format(url_prefix), + request_method='POST') + config.add_view(cls, attr='launch_backfill', + route_name='{}.launch_backfill'.format(route_prefix), + permission='{}.launch_backfill'.format(permission_prefix)) # restart luigid scheduler config.add_tailbone_permission(permission_prefix, @@ -156,8 +291,8 @@ class LuigiJobView(MasterView): def defaults(config, **kwargs): base = globals() - LuigiJobView = kwargs.get('LuigiJobView', base['LuigiJobView']) - LuigiJobView.defaults(config) + LuigiTaskView = kwargs.get('LuigiTaskView', base['LuigiTaskView']) + LuigiTaskView.defaults(config) def includeme(config):