Add basic import/export handler views, tool to run jobs

This commit is contained in:
Lance Edgar 2021-12-05 17:23:11 -06:00
parent 95da490f9a
commit 282185c5af
7 changed files with 744 additions and 77 deletions

View file

@ -771,6 +771,7 @@ class Form(object):
# TODO: deprecate / remove the latter option here # TODO: deprecate / remove the latter option here
if self.auto_disable_save or self.auto_disable: if self.auto_disable_save or self.auto_disable:
if self.use_buefy: if self.use_buefy:
context['form_kwargs']['ref'] = self.component_studly
context['form_kwargs']['@submit'] = 'submit{}'.format(self.component_studly) context['form_kwargs']['@submit'] = 'submit{}'.format(self.component_studly)
else: else:
context['form_kwargs']['class_'] = 'autodisable' context['form_kwargs']['class_'] = 'autodisable'

View file

@ -0,0 +1,12 @@
## -*- coding: utf-8; -*-
<%inherit file="/master/index.mako" />
<%def name="render_grid_component()">
<p class="block">
${request.rattail_config.get_app().get_title()} can run import / export jobs for the following:
</p>
${parent.render_grid_component()}
</%def>
${parent.body()}

View file

@ -0,0 +1,85 @@
## -*- coding: utf-8; -*-
<%inherit file="/master/form.mako" />
<%def name="extra_styles()">
${parent.extra_styles()}
<style type="text/css">
.tailbone-markdown p {
margin-bottom: 1.5rem;
margin-top: 1rem;
}
</style>
</%def>
<%def name="title()">
Run ${handler.direction.capitalize()}:&nbsp; ${handler.get_generic_title()}
</%def>
<%def name="context_menu_items()">
${parent.context_menu_items()}
% if master.has_perm('view'):
<li>${h.link_to("View this {}".format(model_title), action_url('view', handler_info))}</li>
% endif
</%def>
<%def name="render_this_page()">
% if 'rattail.importing.runjob.notes' in request.session:
<b-notification type="is-info tailbone-markdown">
${request.session['rattail.importing.runjob.notes']|n}
</b-notification>
<% del request.session['rattail.importing.runjob.notes'] %>
% endif
${parent.render_this_page()}
</%def>
<%def name="render_form_buttons()">
<br />
${h.hidden('runjob', **{':value': 'runJob'})}
<div class="buttons">
<once-button tag="a" href="${form.cancel_url or request.get_referrer()}"
text="Cancel">
</once-button>
<b-button type="is-primary"
@click="submitRun()"
:disabled="submittingRun"
icon-pack="fas"
icon-left="arrow-circle-right">
{{ submittingRun ? "Working, please wait..." : "Run this ${handler.direction.capitalize()}" }}
</b-button>
<b-button @click="submitExplain()"
:disabled="submittingExplain"
icon-pack="fas"
icon-left="question-circle">
{{ submittingExplain ? "Working, please wait..." : "Just show me the notes" }}
</b-button>
</div>
</%def>
<%def name="modify_this_page_vars()">
${parent.modify_this_page_vars()}
<script type="text/javascript">
${form.component_studly}Data.submittingRun = false
${form.component_studly}Data.submittingExplain = false
${form.component_studly}Data.runJob = false
${form.component_studly}.methods.submitRun = function() {
this.submittingRun = true
this.runJob = true
this.$nextTick(() => {
this.$refs.${form.component_studly}.submit()
})
}
${form.component_studly}.methods.submitExplain = function() {
this.submittingExplain = true
this.$refs.${form.component_studly}.submit()
}
</script>
</%def>
${parent.body()}

View file

@ -0,0 +1,22 @@
## -*- coding: utf-8; -*-
<%inherit file="/master/view.mako" />
<%def name="object_helpers()">
${parent.object_helpers()}
% if master.has_perm('runjob'):
<nav class="panel">
<p class="panel-heading">Tools</p>
<div class="panel-block buttons">
<once-button type="is-primary"
tag="a" href="${url('{}.runjob'.format(route_prefix), key=handler.get_key())}"
icon-pack="fas"
icon-left="arrow-circle-right"
text="Run ${handler.direction.capitalize()} Job">
</once-button>
</div>
</nav>
% endif
</%def>
${parent.body()}

View file

@ -64,10 +64,6 @@ from tailbone.util import csrf_token
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class EverythingComplete(Exception):
pass
class BatchMasterView(MasterView): class BatchMasterView(MasterView):
""" """
Base class for all "batch master" views. Base class for all "batch master" views.
@ -872,78 +868,6 @@ class BatchMasterView(MasterView):
'cancel_msg': "{} of batch was canceled.".format(batch_action.capitalize()), 'cancel_msg': "{} of batch was canceled.".format(batch_action.capitalize()),
}) })
def progress_thread(self, sock, success_url, progress):
"""
This method is meant to be used as a thread target. Its job is to read
progress data from ``connection`` and update the session progress
accordingly. When a final "process complete" indication is read, the
socket will be closed and the thread will end.
"""
while True:
try:
self.process_progress(sock, progress)
except EverythingComplete:
break
# close server socket
sock.close()
# finalize session progress
progress.session.load()
progress.session['complete'] = True
if callable(success_url):
success_url = success_url()
progress.session['success_url'] = success_url
progress.session.save()
def process_progress(self, sock, progress):
"""
This method will accept a client connection on the given socket, and
then update the given progress object according to data written by the
client.
"""
connection, client_address = sock.accept()
active_progress = None
# TODO: make this configurable?
suffix = "\n\n.".encode('utf_8')
data = b''
# listen for progress info, update session progress as needed
while True:
# accumulate data bytestring until we see the suffix
byte = connection.recv(1)
data += byte
if data.endswith(suffix):
# strip suffix, interpret data as JSON
data = data[:-len(suffix)]
if six.PY3:
data = data.decode('utf_8')
data = json.loads(data)
if data.get('everything_complete'):
if active_progress:
active_progress.finish()
raise EverythingComplete
elif data.get('process_complete'):
active_progress.finish()
active_progress = None
break
elif 'value' in data:
if not active_progress:
active_progress = progress(data['message'], data['maximum'])
active_progress.update(data['value'])
# reset data buffer
data = b''
# close client connection
connection.close()
def launch_subprocess(self, port=None, username=None, def launch_subprocess(self, port=None, username=None,
command='rattail', command_args=None, command='rattail', command_args=None,
subcommand=None, subcommand_args=None): subcommand=None, subcommand_args=None):

543
tailbone/views/importing.py Normal file
View file

@ -0,0 +1,543 @@
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2021 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
View for running arbitrary import/export jobs
"""
from __future__ import unicode_literals, absolute_import
import getpass
import socket
import sys
import logging
import subprocess
import time
import json
import six
from rattail.exceptions import ConfigurationError
from rattail.threads import Thread
import colander
import markdown
from deform import widget as dfwidget
from webhelpers2.html import HTML
from tailbone.views import MasterView
log = logging.getLogger(__name__)
class ImportingView(MasterView):
"""
View for running arbitrary import/export jobs
"""
normalized_model_name = 'importhandler'
model_title = "Import / Export Handler"
model_key = 'key'
route_prefix = 'importing'
url_prefix = '/importing'
index_title = "Importing / Exporting"
creatable = False
editable = False
deletable = False
filterable = False
pageable = False
labels = {
'host_title': "Data Source",
'local_title': "Data Target",
}
grid_columns = [
'host_title',
'local_title',
'handler_spec',
]
form_fields = [
'key',
'local_key',
'host_key',
'handler_spec',
'host_title',
'local_title',
'models',
]
runjob_form_fields = [
'handler_spec',
'host_title',
'local_title',
'models',
'create',
'update',
'delete',
# 'runas',
'versioning',
'dry_run',
'warnings',
]
def get_data(self, session=None):
app = self.get_rattail_app()
data = []
for Handler in app.all_import_handlers():
handler = Handler(self.rattail_config)
data.append(self.normalize(handler))
data.sort(key=lambda handler: (handler['host_title'],
handler['local_title']))
return data
def normalize(self, handler):
Handler = handler.__class__
return {
'_handler': handler,
'key': handler.get_key(),
'generic_title': handler.get_generic_title(),
'host_key': handler.host_key,
'host_title': handler.get_generic_host_title(),
'local_key': handler.local_key,
'local_title': handler.get_generic_local_title(),
'handler_spec': handler.get_spec(),
}
def configure_grid(self, g):
super(ImportingView, self).configure_grid(g)
g.set_link('host_title')
g.set_link('local_title')
def get_instance(self):
"""
Fetch the current model instance by inspecting the route kwargs and
doing a database lookup. If the instance cannot be found, raises 404.
"""
key = self.request.matchdict['key']
app = self.get_rattail_app()
for Handler in app.all_import_handlers():
if Handler.get_key() == key:
return self.normalize(Handler(self.rattail_config))
raise self.notfound()
def get_instance_title(self, handler_info):
handler = handler_info['_handler']
return handler.get_generic_title()
def make_form_schema(self):
return ImportHandlerSchema()
def make_form_kwargs(self, **kwargs):
kwargs = super(ImportingView, self).make_form_kwargs(**kwargs)
# nb. this is set as sort of a hack, to prevent SA model
# inspection logic
kwargs['renderers'] = {}
return kwargs
def configure_form(self, f):
super(ImportingView, self).configure_form(f)
f.set_renderer('models', self.render_models)
def render_models(self, handler, field):
handler = handler['_handler']
items = []
for key in handler.get_importer_keys():
items.append(HTML.tag('li', c=[key]))
return HTML.tag('ul', c=items)
def template_kwargs_view(self, **kwargs):
kwargs = super(ImportingView, self).template_kwargs_view(**kwargs)
handler_info = kwargs['instance']
kwargs['handler'] = handler_info['_handler']
return kwargs
def runjob(self):
"""
View for running an import / export job
"""
handler_info = self.get_instance()
handler = handler_info['_handler']
form = self.make_runjob_form(handler_info)
if self.request.method == 'POST':
if self.validate_form(form):
self.cache_runjob_form_values(handler, form)
try:
return self.do_runjob(handler_info, form)
except Exception as error:
self.request.session.flash(six.text_type(error), 'error')
return self.redirect(self.request.current_route_url())
return self.render_to_response('runjob', {
'handler_info': handler_info,
'handler': handler,
'form': form,
})
def cache_runjob_form_values(self, handler, form):
handler_key = handler.get_key()
def make_key(key):
return 'rattail.importing.{}.{}'.format(handler_key, key)
for field in form.fields:
key = make_key(field)
self.request.session[key] = form.validated[field]
def read_cached_runjob_values(self, handler, form):
handler_key = handler.get_key()
def make_key(key):
return 'rattail.importing.{}.{}'.format(handler_key, key)
for field in form.fields:
key = make_key(field)
if key in self.request.session:
form.set_default(field, self.request.session[key])
def make_runjob_form(self, handler_info, **kwargs):
"""
Creates a new form for the given model class/instance
"""
handler = handler_info['_handler']
factory = self.get_form_factory()
fields = list(self.runjob_form_fields)
schema = RunJobSchema()
kwargs = self.make_runjob_form_kwargs(handler_info, **kwargs)
form = factory(fields, schema, **kwargs)
self.configure_runjob_form(handler, form)
self.read_cached_runjob_values(handler, form)
return form
def make_runjob_form_kwargs(self, handler_info, **kwargs):
route_prefix = self.get_route_prefix()
handler = handler_info['_handler']
defaults = {
'request': self.request,
'use_buefy': self.get_use_buefy(),
'model_instance': handler,
'cancel_url': self.request.route_url('{}.view'.format(route_prefix),
key=handler.get_key()),
# nb. these next 2 are set as sort of a hack, to prevent
# SA model inspection logic
'renderers': {},
'appstruct': handler_info,
}
defaults.update(kwargs)
return defaults
def configure_runjob_form(self, handler, f):
self.set_labels(f)
f.set_readonly('handler_spec')
f.set_renderer('handler_spec', lambda handler, field: handler.get_spec())
f.set_readonly('host_title')
f.set_readonly('local_title')
keys = handler.get_importer_keys()
f.set_widget('models', dfwidget.SelectWidget(values=[(k, k) for k in keys],
multiple=True,
size=len(keys)))
# f.set_default('models', keys)
f.set_default('create', True)
f.set_default('update', True)
f.set_default('delete', False)
# f.set_default('runas', self.rattail_config.get('rattail', 'runas.default') or '')
f.set_default('versioning', True)
f.set_default('dry_run', False)
f.set_default('warnings', False)
def do_runjob(self, handler_info, form):
handler = handler_info['_handler']
handler_key = handler.get_key()
if self.request.POST.get('runjob') == 'true':
# will invoke handler to run job
# TODO: this socket progress business was lifted from
# tailbone.views.batch.core:BatchMasterView.handler_action
# should probably refactor to share somehow
# make progress object
key = 'rattail.importing.{}'.format(handler_key)
progress = self.make_progress(key)
# make socket for progress thread to listen to action thread
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('127.0.0.1', 0))
sock.listen(1)
port = sock.getsockname()[1]
# launch thread to monitor progress
success_url = self.request.current_route_url()
thread = Thread(target=self.progress_thread,
args=(sock, success_url, progress))
thread.start()
true_cmd = self.make_runjob_cmd(handler, form, 'true', port=port)
# launch thread to invoke handler
thread = Thread(target=self.do_runjob_thread,
args=(handler, true_cmd, port, progress))
thread.start()
return self.render_progress(progress, {
'can_cancel': False,
'cancel_url': self.request.current_route_url(),
})
else: # explain only
notes_cmd = self.make_runjob_cmd(handler, form, 'notes')
self.cache_runjob_notes(handler, notes_cmd)
return self.redirect(self.request.current_route_url())
def do_runjob_thread(self, handler, cmd, port, progress):
# invoke handler command via subprocess
try:
result = subprocess.run(cmd, check=True, capture_output=True)
output = result.stderr.decode('utf_8').strip()
except Exception as error:
log.warning("failed to invoke handler cmd: %s", cmd, exc_info=True)
if progress:
progress.session.load()
progress.session['error'] = True
msg = """\
{} failed! Here is the command I tried to run:
```
{}
```
And here is the STDERR output:
```
{}
```
""".format(handler.direction.capitalize(),
' '.join(cmd),
error.stderr.decode('utf_8').strip())
msg = markdown.markdown(msg, extensions=['fenced_code'])
msg = HTML.literal(msg)
msg = HTML.tag('div', class_='tailbone-markdown', c=[msg])
progress.session['error_msg'] = msg
progress.session.save()
else: # success
if progress:
progress.session.load()
msg = self.get_runjob_success_msg(handler, output)
progress.session['complete'] = True
progress.session['success_url'] = self.request.current_route_url()
progress.session['success_msg'] = msg
progress.session.save()
suffix = "\n\n.".encode('utf_8')
cxn = socket.create_connection(('127.0.0.1', port))
data = json.dumps({
'everything_complete': True,
})
if six.PY3:
data = data.encode('utf_8')
cxn.send(data)
cxn.send(suffix)
cxn.close()
def get_runjob_success_msg(self, handler, output):
notes = """\
{} went okay, here is the output:
```
{}
```
""".format(handler.direction.capitalize(), output)
notes = markdown.markdown(notes, extensions=['fenced_code'])
notes = HTML.literal(notes)
return HTML.tag('div', class_='tailbone-markdown', c=[notes])
def make_runjob_cmd(self, handler, form, typ, port=None):
handler_key = handler.get_key()
option = '{}.cmd'.format(handler_key)
cmd = self.rattail_config.getlist('rattail.importing', option)
if not cmd or len(cmd) != 2:
msg = ("Missing or invalid config; please set '{}' in the "
"[rattail.importing] section of your config file".format(option))
raise ConfigurationError(msg)
command, subcommand = cmd
option = '{}.runas'.format(handler_key)
runas = self.rattail_config.require('rattail.importing', option)
data = form.validated
if typ == 'true':
cmd = [
'{}/bin/{}'.format(sys.prefix, command),
'--config={}/app/quiet.conf'.format(sys.prefix),
'--progress',
'--progress-socket=127.0.0.1:{}'.format(port),
'--runas={}'.format(runas),
subcommand,
]
else:
cmd = [
'sudo', '-u', getpass.getuser(),
'bin/{}'.format(command),
'-c', 'app/quiet.conf',
'-P',
'--runas', runas,
subcommand,
]
cmd.extend(data['models'])
if data['create']:
if typ == 'true':
cmd.append('--create')
else:
cmd.append('--no-create')
if data['update']:
if typ == 'true':
cmd.append('--update')
else:
cmd.append('--no-update')
if data['delete']:
cmd.append('--delete')
else:
if typ == 'true':
cmd.append('--no-delete')
if data['versioning']:
if typ == 'true':
cmd.append('--versioning')
else:
cmd.append('--no-versioning')
if data['dry_run']:
cmd.append('--dry-run')
if data['warnings']:
cmd.append('--warnings')
return cmd
def cache_runjob_notes(self, handler, notes_cmd):
notes = """\
You can run this {direction} job manually via command line:
```sh
cd {prefix}
{cmd}
```
""".format(direction=handler.direction,
prefix=sys.prefix,
cmd=' '.join(notes_cmd))
self.request.session['rattail.importing.runjob.notes'] = markdown.markdown(
notes, extensions=['fenced_code', 'codehilite'])
@classmethod
def defaults(cls, config):
cls._defaults(config)
cls._importing_defaults(config)
@classmethod
def _importing_defaults(cls, config):
route_prefix = cls.get_route_prefix()
permission_prefix = cls.get_permission_prefix()
instance_url_prefix = cls.get_instance_url_prefix()
# run job
config.add_tailbone_permission(permission_prefix,
'{}.runjob'.format(permission_prefix),
"Run an arbitrary import / export job")
config.add_route('{}.runjob'.format(route_prefix),
'{}/runjob'.format(instance_url_prefix))
config.add_view(cls, attr='runjob',
route_name='{}.runjob'.format(route_prefix),
permission='{}.runjob'.format(permission_prefix))
class ImportHandlerSchema(colander.MappingSchema):
host_key = colander.SchemaNode(colander.String())
local_key = colander.SchemaNode(colander.String())
host_title = colander.SchemaNode(colander.String())
local_title = colander.SchemaNode(colander.String())
handler_spec = colander.SchemaNode(colander.String())
class RunJobSchema(colander.MappingSchema):
handler_spec = colander.SchemaNode(colander.String())
host_title = colander.SchemaNode(colander.String())
local_title = colander.SchemaNode(colander.String())
models = colander.SchemaNode(colander.List())
create = colander.SchemaNode(colander.Bool())
update = colander.SchemaNode(colander.Bool())
delete = colander.SchemaNode(colander.Bool())
# runas = colander.SchemaNode(colander.String())
versioning = colander.SchemaNode(colander.Bool())
dry_run = colander.SchemaNode(colander.Bool())
warnings = colander.SchemaNode(colander.Bool())
def includeme(config):
ImportingView.defaults(config)

View file

@ -32,6 +32,7 @@ import datetime
import tempfile import tempfile
import logging import logging
import json
import six import six
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy import orm from sqlalchemy import orm
@ -65,6 +66,10 @@ from tailbone.config import global_help_url
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class EverythingComplete(Exception):
pass
class MasterView(View): class MasterView(View):
""" """
Base "master" view class. All model master views should derive from this. Base "master" view class. All model master views should derive from this.
@ -1743,6 +1748,78 @@ class MasterView(View):
def get_execute_success_url(self, obj, **kwargs): def get_execute_success_url(self, obj, **kwargs):
return self.get_action_url('view', obj, **kwargs) return self.get_action_url('view', obj, **kwargs)
def progress_thread(self, sock, success_url, progress):
"""
This method is meant to be used as a thread target. Its job is to read
progress data from ``connection`` and update the session progress
accordingly. When a final "process complete" indication is read, the
socket will be closed and the thread will end.
"""
while True:
try:
self.process_progress(sock, progress)
except EverythingComplete:
break
# close server socket
sock.close()
# finalize session progress
progress.session.load()
progress.session['complete'] = True
if callable(success_url):
success_url = success_url()
progress.session['success_url'] = success_url
progress.session.save()
def process_progress(self, sock, progress):
"""
This method will accept a client connection on the given socket, and
then update the given progress object according to data written by the
client.
"""
connection, client_address = sock.accept()
active_progress = None
# TODO: make this configurable?
suffix = "\n\n.".encode('utf_8')
data = b''
# listen for progress info, update session progress as needed
while True:
# accumulate data bytestring until we see the suffix
byte = connection.recv(1)
data += byte
if data.endswith(suffix):
# strip suffix, interpret data as JSON
data = data[:-len(suffix)]
if six.PY3:
data = data.decode('utf_8')
data = json.loads(data)
if data.get('everything_complete'):
if active_progress:
active_progress.finish()
raise EverythingComplete
elif data.get('process_complete'):
active_progress.finish()
active_progress = None
break
elif 'value' in data:
if not active_progress:
active_progress = progress(data['message'], data['maximum'])
active_progress.update(data['value'])
# reset data buffer
data = b''
# close client connection
connection.close()
def get_merge_fields(self): def get_merge_fields(self):
if hasattr(self, 'merge_fields'): if hasattr(self, 'merge_fields'):
return self.merge_fields return self.merge_fields
@ -2287,7 +2364,10 @@ class MasterView(View):
try: try:
mapper = orm.object_mapper(row) mapper = orm.object_mapper(row)
except orm.exc.UnmappedInstanceError: except orm.exc.UnmappedInstanceError:
return {self.model_key: row[self.model_key]} try:
return {self.model_key: row[self.model_key]}
except TypeError:
return {self.model_key: getattr(row, self.model_key)}
else: else:
pkeys = get_primary_keys(row) pkeys = get_primary_keys(row)
keys = list(pkeys) keys = list(pkeys)