From 282185c5af2f9b6411e2b430a5d3569c12f3bd3a Mon Sep 17 00:00:00 2001 From: Lance Edgar Date: Sun, 5 Dec 2021 17:23:11 -0600 Subject: [PATCH] Add basic import/export handler views, tool to run jobs --- tailbone/forms/core.py | 1 + tailbone/templates/importing/index.mako | 12 + tailbone/templates/importing/runjob.mako | 85 ++++ tailbone/templates/importing/view.mako | 22 + tailbone/views/batch/core.py | 76 ---- tailbone/views/importing.py | 543 +++++++++++++++++++++++ tailbone/views/master.py | 82 +++- 7 files changed, 744 insertions(+), 77 deletions(-) create mode 100644 tailbone/templates/importing/index.mako create mode 100644 tailbone/templates/importing/runjob.mako create mode 100644 tailbone/templates/importing/view.mako create mode 100644 tailbone/views/importing.py diff --git a/tailbone/forms/core.py b/tailbone/forms/core.py index 2267b8dc..060e1133 100644 --- a/tailbone/forms/core.py +++ b/tailbone/forms/core.py @@ -771,6 +771,7 @@ class Form(object): # TODO: deprecate / remove the latter option here if self.auto_disable_save or self.auto_disable: if self.use_buefy: + context['form_kwargs']['ref'] = self.component_studly context['form_kwargs']['@submit'] = 'submit{}'.format(self.component_studly) else: context['form_kwargs']['class_'] = 'autodisable' diff --git a/tailbone/templates/importing/index.mako b/tailbone/templates/importing/index.mako new file mode 100644 index 00000000..c2d9c6ec --- /dev/null +++ b/tailbone/templates/importing/index.mako @@ -0,0 +1,12 @@ +## -*- coding: utf-8; -*- +<%inherit file="/master/index.mako" /> + +<%def name="render_grid_component()"> +

+ ${request.rattail_config.get_app().get_title()} can run import / export jobs for the following: +

+ ${parent.render_grid_component()} + + + +${parent.body()} diff --git a/tailbone/templates/importing/runjob.mako b/tailbone/templates/importing/runjob.mako new file mode 100644 index 00000000..2b9642f6 --- /dev/null +++ b/tailbone/templates/importing/runjob.mako @@ -0,0 +1,85 @@ +## -*- coding: utf-8; -*- +<%inherit file="/master/form.mako" /> + +<%def name="extra_styles()"> + ${parent.extra_styles()} + + + +<%def name="title()"> + Run ${handler.direction.capitalize()}:  ${handler.get_generic_title()} + + +<%def name="context_menu_items()"> + ${parent.context_menu_items()} + % if master.has_perm('view'): +
  • ${h.link_to("View this {}".format(model_title), action_url('view', handler_info))}
  • + % endif + + +<%def name="render_this_page()"> + % if 'rattail.importing.runjob.notes' in request.session: + + ${request.session['rattail.importing.runjob.notes']|n} + + <% del request.session['rattail.importing.runjob.notes'] %> + % endif + + ${parent.render_this_page()} + + +<%def name="render_form_buttons()"> +
    + ${h.hidden('runjob', **{':value': 'runJob'})} +
    + + + + {{ submittingRun ? "Working, please wait..." : "Run this ${handler.direction.capitalize()}" }} + + + {{ submittingExplain ? "Working, please wait..." : "Just show me the notes" }} + +
    + + +<%def name="modify_this_page_vars()"> + ${parent.modify_this_page_vars()} + + + +${parent.body()} diff --git a/tailbone/templates/importing/view.mako b/tailbone/templates/importing/view.mako new file mode 100644 index 00000000..3a28737c --- /dev/null +++ b/tailbone/templates/importing/view.mako @@ -0,0 +1,22 @@ +## -*- coding: utf-8; -*- +<%inherit file="/master/view.mako" /> + +<%def name="object_helpers()"> + ${parent.object_helpers()} + % if master.has_perm('runjob'): + + % endif + + + +${parent.body()} diff --git a/tailbone/views/batch/core.py b/tailbone/views/batch/core.py index 821628aa..90614079 100644 --- a/tailbone/views/batch/core.py +++ b/tailbone/views/batch/core.py @@ -64,10 +64,6 @@ from tailbone.util import csrf_token log = logging.getLogger(__name__) -class EverythingComplete(Exception): - pass - - class BatchMasterView(MasterView): """ Base class for all "batch master" views. @@ -872,78 +868,6 @@ class BatchMasterView(MasterView): 'cancel_msg': "{} of batch was canceled.".format(batch_action.capitalize()), }) - def progress_thread(self, sock, success_url, progress): - """ - This method is meant to be used as a thread target. Its job is to read - progress data from ``connection`` and update the session progress - accordingly. When a final "process complete" indication is read, the - socket will be closed and the thread will end. - """ - while True: - try: - self.process_progress(sock, progress) - except EverythingComplete: - break - - # close server socket - sock.close() - - # finalize session progress - progress.session.load() - progress.session['complete'] = True - if callable(success_url): - success_url = success_url() - progress.session['success_url'] = success_url - progress.session.save() - - def process_progress(self, sock, progress): - """ - This method will accept a client connection on the given socket, and - then update the given progress object according to data written by the - client. - """ - connection, client_address = sock.accept() - active_progress = None - - # TODO: make this configurable? - suffix = "\n\n.".encode('utf_8') - data = b'' - - # listen for progress info, update session progress as needed - while True: - - # accumulate data bytestring until we see the suffix - byte = connection.recv(1) - data += byte - if data.endswith(suffix): - - # strip suffix, interpret data as JSON - data = data[:-len(suffix)] - if six.PY3: - data = data.decode('utf_8') - data = json.loads(data) - - if data.get('everything_complete'): - if active_progress: - active_progress.finish() - raise EverythingComplete - - elif data.get('process_complete'): - active_progress.finish() - active_progress = None - break - - elif 'value' in data: - if not active_progress: - active_progress = progress(data['message'], data['maximum']) - active_progress.update(data['value']) - - # reset data buffer - data = b'' - - # close client connection - connection.close() - def launch_subprocess(self, port=None, username=None, command='rattail', command_args=None, subcommand=None, subcommand_args=None): diff --git a/tailbone/views/importing.py b/tailbone/views/importing.py new file mode 100644 index 00000000..23a039cd --- /dev/null +++ b/tailbone/views/importing.py @@ -0,0 +1,543 @@ +# -*- coding: utf-8; -*- +################################################################################ +# +# Rattail -- Retail Software Framework +# Copyright © 2010-2021 Lance Edgar +# +# This file is part of Rattail. +# +# Rattail is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# Rattail. If not, see . +# +################################################################################ +""" +View for running arbitrary import/export jobs +""" + +from __future__ import unicode_literals, absolute_import + +import getpass +import socket +import sys +import logging +import subprocess +import time + +import json +import six + +from rattail.exceptions import ConfigurationError +from rattail.threads import Thread + +import colander +import markdown +from deform import widget as dfwidget +from webhelpers2.html import HTML + +from tailbone.views import MasterView + + +log = logging.getLogger(__name__) + + +class ImportingView(MasterView): + """ + View for running arbitrary import/export jobs + """ + normalized_model_name = 'importhandler' + model_title = "Import / Export Handler" + model_key = 'key' + route_prefix = 'importing' + url_prefix = '/importing' + index_title = "Importing / Exporting" + creatable = False + editable = False + deletable = False + filterable = False + pageable = False + + labels = { + 'host_title': "Data Source", + 'local_title': "Data Target", + } + + grid_columns = [ + 'host_title', + 'local_title', + 'handler_spec', + ] + + form_fields = [ + 'key', + 'local_key', + 'host_key', + 'handler_spec', + 'host_title', + 'local_title', + 'models', + ] + + runjob_form_fields = [ + 'handler_spec', + 'host_title', + 'local_title', + 'models', + 'create', + 'update', + 'delete', + # 'runas', + 'versioning', + 'dry_run', + 'warnings', + ] + + def get_data(self, session=None): + app = self.get_rattail_app() + data = [] + + for Handler in app.all_import_handlers(): + handler = Handler(self.rattail_config) + data.append(self.normalize(handler)) + + data.sort(key=lambda handler: (handler['host_title'], + handler['local_title'])) + return data + + def normalize(self, handler): + Handler = handler.__class__ + return { + '_handler': handler, + 'key': handler.get_key(), + 'generic_title': handler.get_generic_title(), + 'host_key': handler.host_key, + 'host_title': handler.get_generic_host_title(), + 'local_key': handler.local_key, + 'local_title': handler.get_generic_local_title(), + 'handler_spec': handler.get_spec(), + } + + def configure_grid(self, g): + super(ImportingView, self).configure_grid(g) + + g.set_link('host_title') + g.set_link('local_title') + + def get_instance(self): + """ + Fetch the current model instance by inspecting the route kwargs and + doing a database lookup. If the instance cannot be found, raises 404. + """ + key = self.request.matchdict['key'] + app = self.get_rattail_app() + for Handler in app.all_import_handlers(): + if Handler.get_key() == key: + return self.normalize(Handler(self.rattail_config)) + raise self.notfound() + + def get_instance_title(self, handler_info): + handler = handler_info['_handler'] + return handler.get_generic_title() + + def make_form_schema(self): + return ImportHandlerSchema() + + def make_form_kwargs(self, **kwargs): + kwargs = super(ImportingView, self).make_form_kwargs(**kwargs) + + # nb. this is set as sort of a hack, to prevent SA model + # inspection logic + kwargs['renderers'] = {} + + return kwargs + + def configure_form(self, f): + super(ImportingView, self).configure_form(f) + + f.set_renderer('models', self.render_models) + + def render_models(self, handler, field): + handler = handler['_handler'] + items = [] + for key in handler.get_importer_keys(): + items.append(HTML.tag('li', c=[key])) + return HTML.tag('ul', c=items) + + def template_kwargs_view(self, **kwargs): + kwargs = super(ImportingView, self).template_kwargs_view(**kwargs) + handler_info = kwargs['instance'] + kwargs['handler'] = handler_info['_handler'] + return kwargs + + def runjob(self): + """ + View for running an import / export job + """ + handler_info = self.get_instance() + handler = handler_info['_handler'] + form = self.make_runjob_form(handler_info) + + if self.request.method == 'POST': + if self.validate_form(form): + + self.cache_runjob_form_values(handler, form) + + try: + return self.do_runjob(handler_info, form) + except Exception as error: + self.request.session.flash(six.text_type(error), 'error') + return self.redirect(self.request.current_route_url()) + + return self.render_to_response('runjob', { + 'handler_info': handler_info, + 'handler': handler, + 'form': form, + }) + + def cache_runjob_form_values(self, handler, form): + handler_key = handler.get_key() + + def make_key(key): + return 'rattail.importing.{}.{}'.format(handler_key, key) + + for field in form.fields: + key = make_key(field) + self.request.session[key] = form.validated[field] + + def read_cached_runjob_values(self, handler, form): + handler_key = handler.get_key() + + def make_key(key): + return 'rattail.importing.{}.{}'.format(handler_key, key) + + for field in form.fields: + key = make_key(field) + if key in self.request.session: + form.set_default(field, self.request.session[key]) + + def make_runjob_form(self, handler_info, **kwargs): + """ + Creates a new form for the given model class/instance + """ + handler = handler_info['_handler'] + factory = self.get_form_factory() + fields = list(self.runjob_form_fields) + schema = RunJobSchema() + + kwargs = self.make_runjob_form_kwargs(handler_info, **kwargs) + form = factory(fields, schema, **kwargs) + self.configure_runjob_form(handler, form) + + self.read_cached_runjob_values(handler, form) + + return form + + def make_runjob_form_kwargs(self, handler_info, **kwargs): + route_prefix = self.get_route_prefix() + handler = handler_info['_handler'] + defaults = { + 'request': self.request, + 'use_buefy': self.get_use_buefy(), + 'model_instance': handler, + 'cancel_url': self.request.route_url('{}.view'.format(route_prefix), + key=handler.get_key()), + # nb. these next 2 are set as sort of a hack, to prevent + # SA model inspection logic + 'renderers': {}, + 'appstruct': handler_info, + } + defaults.update(kwargs) + return defaults + + def configure_runjob_form(self, handler, f): + self.set_labels(f) + + f.set_readonly('handler_spec') + f.set_renderer('handler_spec', lambda handler, field: handler.get_spec()) + + f.set_readonly('host_title') + f.set_readonly('local_title') + + keys = handler.get_importer_keys() + f.set_widget('models', dfwidget.SelectWidget(values=[(k, k) for k in keys], + multiple=True, + size=len(keys))) + # f.set_default('models', keys) + + f.set_default('create', True) + f.set_default('update', True) + f.set_default('delete', False) + # f.set_default('runas', self.rattail_config.get('rattail', 'runas.default') or '') + f.set_default('versioning', True) + f.set_default('dry_run', False) + f.set_default('warnings', False) + + def do_runjob(self, handler_info, form): + handler = handler_info['_handler'] + handler_key = handler.get_key() + + if self.request.POST.get('runjob') == 'true': + + # will invoke handler to run job + + # TODO: this socket progress business was lifted from + # tailbone.views.batch.core:BatchMasterView.handler_action + # should probably refactor to share somehow + + # make progress object + key = 'rattail.importing.{}'.format(handler_key) + progress = self.make_progress(key) + + # make socket for progress thread to listen to action thread + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('127.0.0.1', 0)) + sock.listen(1) + port = sock.getsockname()[1] + + # launch thread to monitor progress + success_url = self.request.current_route_url() + thread = Thread(target=self.progress_thread, + args=(sock, success_url, progress)) + thread.start() + + true_cmd = self.make_runjob_cmd(handler, form, 'true', port=port) + + # launch thread to invoke handler + thread = Thread(target=self.do_runjob_thread, + args=(handler, true_cmd, port, progress)) + thread.start() + + return self.render_progress(progress, { + 'can_cancel': False, + 'cancel_url': self.request.current_route_url(), + }) + + else: # explain only + notes_cmd = self.make_runjob_cmd(handler, form, 'notes') + self.cache_runjob_notes(handler, notes_cmd) + + return self.redirect(self.request.current_route_url()) + + def do_runjob_thread(self, handler, cmd, port, progress): + + # invoke handler command via subprocess + try: + result = subprocess.run(cmd, check=True, capture_output=True) + output = result.stderr.decode('utf_8').strip() + + except Exception as error: + log.warning("failed to invoke handler cmd: %s", cmd, exc_info=True) + if progress: + progress.session.load() + progress.session['error'] = True + msg = """\ +{} failed! Here is the command I tried to run: + +``` +{} +``` + +And here is the STDERR output: + +``` +{} +``` +""".format(handler.direction.capitalize(), + ' '.join(cmd), + error.stderr.decode('utf_8').strip()) + msg = markdown.markdown(msg, extensions=['fenced_code']) + msg = HTML.literal(msg) + msg = HTML.tag('div', class_='tailbone-markdown', c=[msg]) + progress.session['error_msg'] = msg + progress.session.save() + + else: # success + + if progress: + progress.session.load() + msg = self.get_runjob_success_msg(handler, output) + progress.session['complete'] = True + progress.session['success_url'] = self.request.current_route_url() + progress.session['success_msg'] = msg + progress.session.save() + + suffix = "\n\n.".encode('utf_8') + cxn = socket.create_connection(('127.0.0.1', port)) + data = json.dumps({ + 'everything_complete': True, + }) + if six.PY3: + data = data.encode('utf_8') + cxn.send(data) + cxn.send(suffix) + cxn.close() + + def get_runjob_success_msg(self, handler, output): + notes = """\ +{} went okay, here is the output: + +``` +{} +``` +""".format(handler.direction.capitalize(), output) + + notes = markdown.markdown(notes, extensions=['fenced_code']) + notes = HTML.literal(notes) + return HTML.tag('div', class_='tailbone-markdown', c=[notes]) + + def make_runjob_cmd(self, handler, form, typ, port=None): + handler_key = handler.get_key() + + option = '{}.cmd'.format(handler_key) + cmd = self.rattail_config.getlist('rattail.importing', option) + if not cmd or len(cmd) != 2: + msg = ("Missing or invalid config; please set '{}' in the " + "[rattail.importing] section of your config file".format(option)) + raise ConfigurationError(msg) + + command, subcommand = cmd + + option = '{}.runas'.format(handler_key) + runas = self.rattail_config.require('rattail.importing', option) + + data = form.validated + + if typ == 'true': + cmd = [ + '{}/bin/{}'.format(sys.prefix, command), + '--config={}/app/quiet.conf'.format(sys.prefix), + '--progress', + '--progress-socket=127.0.0.1:{}'.format(port), + '--runas={}'.format(runas), + subcommand, + ] + else: + cmd = [ + 'sudo', '-u', getpass.getuser(), + 'bin/{}'.format(command), + '-c', 'app/quiet.conf', + '-P', + '--runas', runas, + subcommand, + ] + + cmd.extend(data['models']) + + if data['create']: + if typ == 'true': + cmd.append('--create') + else: + cmd.append('--no-create') + + if data['update']: + if typ == 'true': + cmd.append('--update') + else: + cmd.append('--no-update') + + if data['delete']: + cmd.append('--delete') + else: + if typ == 'true': + cmd.append('--no-delete') + + if data['versioning']: + if typ == 'true': + cmd.append('--versioning') + else: + cmd.append('--no-versioning') + + if data['dry_run']: + cmd.append('--dry-run') + + if data['warnings']: + cmd.append('--warnings') + + return cmd + + def cache_runjob_notes(self, handler, notes_cmd): + notes = """\ +You can run this {direction} job manually via command line: + +```sh +cd {prefix} +{cmd} +``` +""".format(direction=handler.direction, + prefix=sys.prefix, + cmd=' '.join(notes_cmd)) + + self.request.session['rattail.importing.runjob.notes'] = markdown.markdown( + notes, extensions=['fenced_code', 'codehilite']) + + @classmethod + def defaults(cls, config): + cls._defaults(config) + cls._importing_defaults(config) + + @classmethod + def _importing_defaults(cls, config): + route_prefix = cls.get_route_prefix() + permission_prefix = cls.get_permission_prefix() + instance_url_prefix = cls.get_instance_url_prefix() + + # run job + config.add_tailbone_permission(permission_prefix, + '{}.runjob'.format(permission_prefix), + "Run an arbitrary import / export job") + config.add_route('{}.runjob'.format(route_prefix), + '{}/runjob'.format(instance_url_prefix)) + config.add_view(cls, attr='runjob', + route_name='{}.runjob'.format(route_prefix), + permission='{}.runjob'.format(permission_prefix)) + + +class ImportHandlerSchema(colander.MappingSchema): + + host_key = colander.SchemaNode(colander.String()) + + local_key = colander.SchemaNode(colander.String()) + + host_title = colander.SchemaNode(colander.String()) + + local_title = colander.SchemaNode(colander.String()) + + handler_spec = colander.SchemaNode(colander.String()) + + +class RunJobSchema(colander.MappingSchema): + + handler_spec = colander.SchemaNode(colander.String()) + + host_title = colander.SchemaNode(colander.String()) + + local_title = colander.SchemaNode(colander.String()) + + models = colander.SchemaNode(colander.List()) + + create = colander.SchemaNode(colander.Bool()) + + update = colander.SchemaNode(colander.Bool()) + + delete = colander.SchemaNode(colander.Bool()) + + # runas = colander.SchemaNode(colander.String()) + + versioning = colander.SchemaNode(colander.Bool()) + + dry_run = colander.SchemaNode(colander.Bool()) + + warnings = colander.SchemaNode(colander.Bool()) + + +def includeme(config): + ImportingView.defaults(config) diff --git a/tailbone/views/master.py b/tailbone/views/master.py index f288ec34..2a3189c4 100644 --- a/tailbone/views/master.py +++ b/tailbone/views/master.py @@ -32,6 +32,7 @@ import datetime import tempfile import logging +import json import six import sqlalchemy as sa from sqlalchemy import orm @@ -65,6 +66,10 @@ from tailbone.config import global_help_url log = logging.getLogger(__name__) +class EverythingComplete(Exception): + pass + + class MasterView(View): """ Base "master" view class. All model master views should derive from this. @@ -1743,6 +1748,78 @@ class MasterView(View): def get_execute_success_url(self, obj, **kwargs): return self.get_action_url('view', obj, **kwargs) + def progress_thread(self, sock, success_url, progress): + """ + This method is meant to be used as a thread target. Its job is to read + progress data from ``connection`` and update the session progress + accordingly. When a final "process complete" indication is read, the + socket will be closed and the thread will end. + """ + while True: + try: + self.process_progress(sock, progress) + except EverythingComplete: + break + + # close server socket + sock.close() + + # finalize session progress + progress.session.load() + progress.session['complete'] = True + if callable(success_url): + success_url = success_url() + progress.session['success_url'] = success_url + progress.session.save() + + def process_progress(self, sock, progress): + """ + This method will accept a client connection on the given socket, and + then update the given progress object according to data written by the + client. + """ + connection, client_address = sock.accept() + active_progress = None + + # TODO: make this configurable? + suffix = "\n\n.".encode('utf_8') + data = b'' + + # listen for progress info, update session progress as needed + while True: + + # accumulate data bytestring until we see the suffix + byte = connection.recv(1) + data += byte + if data.endswith(suffix): + + # strip suffix, interpret data as JSON + data = data[:-len(suffix)] + if six.PY3: + data = data.decode('utf_8') + data = json.loads(data) + + if data.get('everything_complete'): + if active_progress: + active_progress.finish() + raise EverythingComplete + + elif data.get('process_complete'): + active_progress.finish() + active_progress = None + break + + elif 'value' in data: + if not active_progress: + active_progress = progress(data['message'], data['maximum']) + active_progress.update(data['value']) + + # reset data buffer + data = b'' + + # close client connection + connection.close() + def get_merge_fields(self): if hasattr(self, 'merge_fields'): return self.merge_fields @@ -2287,7 +2364,10 @@ class MasterView(View): try: mapper = orm.object_mapper(row) except orm.exc.UnmappedInstanceError: - return {self.model_key: row[self.model_key]} + try: + return {self.model_key: row[self.model_key]} + except TypeError: + return {self.model_key: getattr(row, self.model_key)} else: pkeys = get_primary_keys(row) keys = list(pkeys)