From 44ff02b7af057fc08098ccd3284848e841d8b77f Mon Sep 17 00:00:00 2001 From: Lance Edgar Date: Thu, 7 Jun 2018 12:40:25 -0500 Subject: [PATCH] Add versioning workaround support for batch actions * add `can_cancel` flag for progress page, hide button if set * overhaul populate/refresh/execute to launch socket/subprocess if necessary --- tailbone/progress.py | 9 +- tailbone/templates/progress.mako | 8 + tailbone/views/batch/core.py | 321 ++++++++++++++++++++++--------- tailbone/views/core.py | 1 + 4 files changed, 241 insertions(+), 98 deletions(-) diff --git a/tailbone/progress.py b/tailbone/progress.py index 0879025b..90fa21be 100644 --- a/tailbone/progress.py +++ b/tailbone/progress.py @@ -2,7 +2,7 @@ ################################################################################ # # Rattail -- Retail Software Framework -# Copyright © 2010-2017 Lance Edgar +# Copyright © 2010-2018 Lance Edgar # # This file is part of Rattail. # @@ -28,6 +28,8 @@ from __future__ import unicode_literals, absolute_import import os +from rattail.progress import ProgressBase + from beaker.session import Session @@ -43,7 +45,7 @@ def get_progress_session(request, key, **kwargs): return session -class SessionProgress(object): +class SessionProgress(ProgressBase): """ Provides a session-based progress bar mechanism. @@ -82,6 +84,3 @@ class SessionProgress(object): self.session['value'] = value self.session.save() return not self.canceled - - def destroy(self): - pass diff --git a/tailbone/templates/progress.mako b/tailbone/templates/progress.mako index c378a986..2df195ff 100644 --- a/tailbone/templates/progress.mako +++ b/tailbone/templates/progress.mako @@ -20,6 +20,7 @@ updater = setInterval(function() {update_progress()}, 1000); + % if can_cancel: $(function() { $('#cancel button').click(function() { @@ -39,6 +40,7 @@ }); }); + % endif @@ -60,9 +62,11 @@ + % if can_cancel: + % endif @@ -86,10 +90,14 @@ } else if (data.complete || data.maximum) { $('#message').html(data.message); $('#total').html('('+data.maximum_display+' total)'); + % if can_cancel: $('#cancel button').show(); + % endif if (data.complete) { clearInterval(updater); + % if can_cancel: $('#cancel button').hide(); + % endif $('#total').html('done!'); $('#complete').css('width', '100%'); $('#remaining').hide(); diff --git a/tailbone/views/batch/core.py b/tailbone/views/batch/core.py index d7c3794c..150dd560 100644 --- a/tailbone/views/batch/core.py +++ b/tailbone/views/batch/core.py @@ -27,18 +27,24 @@ Base views for maintaining "new-style" batches. from __future__ import unicode_literals, absolute_import import os +import sys import datetime import logging +import socket +import subprocess import tempfile from six import StringIO +import json import six import sqlalchemy as sa from sqlalchemy import orm from rattail.db import model, Session as RattailSession +from rattail.db.util import short_session from rattail.threads import Thread from rattail.util import load_object, prettify +from rattail.progress import SocketProgress import colander import deform @@ -57,6 +63,10 @@ from tailbone.util import csrf_token log = logging.getLogger(__name__) +class EverythingComplete(Exception): + pass + + class BatchMasterView(MasterView): """ Base class for all "batch master" views. @@ -696,43 +706,225 @@ class BatchMasterView(MasterView): return self.handler.get_execute_title(batch) return "Execute Batch" + def handler_action(self, batch, action, **kwargs): + """ + View which will attempt to refresh all data for the batch. What + exactly this means will depend on the type of batch etc. + """ + route_prefix = self.get_route_prefix() + permission_prefix = self.get_permission_prefix() + + user = self.request.user + user_uuid = user.uuid if user else None + username = user.username if user else None + + key = '{}.{}'.format(self.model_class.__tablename__, action) + progress = SessionProgress(self.request, key) + + # must ensure versioning is *disabled* during action, if handler says so + allow_versioning = self.handler.allow_versioning(action) + if not allow_versioning and self.rattail_config.versioning_enabled(): + can_cancel = False + + # make socket for progress thread to listen to action thread + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('127.0.0.1', 0)) + sock.listen(1) + port = sock.getsockname()[1] + + # launch thread to monitor progress + success_url = self.get_action_url('view', batch) + thread = Thread(target=self.progress_thread, args=(sock, success_url, progress)) + thread.start() + + # launch thread to invoke handler action + thread = Thread(target=self.action_subprocess_thread, args=(batch.uuid, port, username, action, progress)) + thread.start() + + else: # either versioning is disabled, or handler doesn't mind + can_cancel = True + + # launch thread to populate batch; that will update session progress directly + target = getattr(self, '{}_thread'.format(action)) + thread = Thread(target=target, args=(batch.uuid, user_uuid, progress), kwargs=kwargs) + thread.start() + + return self.render_progress(progress, { + 'can_cancel': can_cancel, + 'cancel_url': self.get_action_url('view', batch), + 'cancel_msg': "{} of batch was canceled.".format(action.capitalize()), + }) + + def progress_thread(self, sock, success_url, progress): + """ + This method is meant to be used as a thread target. Its job is to read + progress data from ``connection`` and update the session progress + accordingly. When a final "process complete" indication is read, the + socket will be closed and the thread will end. + """ + while True: + try: + self.process_progress(sock, progress) + except EverythingComplete: + break + + # close server socket + sock.close() + + # finalize session progress + progress.session.load() + progress.session['complete'] = True + if callable(success_url): + success_url = success_url() + progress.session['success_url'] = success_url + progress.session.save() + + def process_progress(self, sock, progress): + """ + This method will accept a client connection on the given socket, and + then update the given progress object according to data written by the + client. + """ + connection, client_address = sock.accept() + active_progress = None + + # TODO: make this configurable? + suffix = "\n\n.".encode('utf_8') + data = b'' + + # listen for progress info, update session progress as needed + while True: + + # accumulate data bytestring until we see the suffix + byte = connection.recv(1) + data += byte + if data.endswith(suffix): + + # strip suffix, interpret data as JSON + data = data[:-len(suffix)] + data = json.loads(data) + + if data.get('everything_complete'): + if active_progress: + active_progress.finish() + raise EverythingComplete + + elif data.get('process_complete'): + active_progress.finish() + active_progress = None + break + + elif 'value' in data: + if not active_progress: + active_progress = progress(data['message'], data['maximum']) + active_progress.update(data['value']) + + # reset data buffer + data = b'' + + # close client connection + connection.close() + + def launch_subprocess(self, port=None, username=None, + command='rattail', command_args=None, + subcommand=None, subcommand_args=None): + + # construct command + cmd = [os.path.join(sys.prefix, 'bin/{}'.format(command))] + for path in self.rattail_config.files_read: + cmd.extend(['--config', path]) + if username: + cmd.extend(['--runas', username]) + if command_args: + cmd.extend(command_args) + cmd.extend([ + '--progress', + '--progress-socket', '127.0.0.1:{}'.format(port), + subcommand, + ]) + if subcommand_args: + cmd.extend(subcommand_args) + + # run command in subprocess + subprocess.check_call(cmd) + + def action_subprocess_thread(self, batch_uuid, port, username, action, progress): + """ + This method is sort of an alternative thread target for batch actions, + to be used in the event versioning is enabled for the main process but + the handler says versioning must be avoided during the action. It must + launch a separate process with versioning disabled in order to act on + the batch. + """ + # invoke command to act on batch in separate process + try: + self.launch_subprocess(port=port, username=username, + command='rattail', + command_args=[ + '--no-versioning', + ], + subcommand='{}-batch'.format(action), + subcommand_args=[ + self.handler.batch_key, + batch_uuid, + ]) + except Exception as error: + log.warning("%s of '%s' batch failed: %s", action, self.handler.batch_key, batch_uuid, exc_info=True) + + # TODO: write error info to socket + + # if progress: + # progress.session.load() + # progress.session['error'] = True + # progress.session['error_msg'] = "Batch population failed: {} - {}".format(error.__class__.__name__, error) + # progress.session.save() + + return + + models = getattr(self.handler, 'version_catchup_{}'.format(action), None) + if models: + self.catchup_versions(port, batch_uuid, username, *models) + + suffix = "\n\n.".encode('utf_8') + cxn = socket.create_connection(('127.0.0.1', port)) + cxn.send(json.dumps({ + 'everything_complete': True, + })) + cxn.send(suffix) + cxn.close() + + def catchup_versions(self, port, batch_uuid, username, *models): + with short_session() as s: + batch = s.query(self.model_class).get(batch_uuid) + batch_id = batch.id_str + description = six.text_type(batch) + + self.launch_subprocess( + port=port, username=username, + command='rattail', + subcommand='import-versions', + subcommand_args=[ + '--comment', + "version catch-up for '{}' batch {}: {}".format(self.handler.batch_key, batch_id, description), + ] + list(models)) + def prefill(self): """ View which will attempt to prefill all data for the batch. What exactly this means will depend on the type of batch etc. """ batch = self.get_instance() - route_prefix = self.get_route_prefix() - permission_prefix = self.get_permission_prefix() + return self.handler_action(batch, 'populate') - # showing progress requires a separate thread; start that first - key = '{}.prefill'.format(route_prefix) - progress = SessionProgress(self.request, key) - thread = Thread(target=self.prefill_thread, args=(batch.uuid, progress)) - thread.start() - - # Send user to progress page. - kwargs = { - 'cancel_url': self.get_action_url('view', batch), - 'cancel_msg': "Batch prefill was canceled.", - } - - # TODO: This seems hacky...it exists for (only) one specific scenario. - if not self.request.has_perm('{}.view'.format(permission_prefix)): - kwargs['cancel_url'] = self.request.route_url('{}.create'.format(route_prefix)) - - return self.render_progress(progress, kwargs) - - def prefill_thread(self, batch_uuid, progress): + def populate_thread(self, batch_uuid, user_uuid, progress, **kwargs): """ - Thread target for prefilling batch data with progress indicator. + Thread target for populating batch data with progress indicator. """ # mustn't use tailbone web session here session = RattailSession() batch = session.query(self.model_class).get(batch_uuid) try: - self.handler.populate(batch, progress=progress) - self.handler.refresh_batch_status(batch) + self.handler.do_populate(batch, progress=progress) except Exception as error: session.rollback() log.warning("batch population failed: %s", batch, exc_info=True) @@ -761,56 +953,9 @@ class BatchMasterView(MasterView): exactly this means will depend on the type of batch etc. """ batch = self.get_instance() - route_prefix = self.get_route_prefix() - permission_prefix = self.get_permission_prefix() + return self.handler_action(batch, 'refresh') - # TODO: deprecate / remove this - cognizer = self.request.user - if not cognizer: - uuid = self.request.session.pop('late_login_user', None) - cognizer = Session.query(model.User).get(uuid) if uuid else None - - # TODO: refresh should probably always imply/use progress - # If handler doesn't declare the need for progress indicator, things - # are nice and simple. - if not getattr(self.handler, 'show_progress', True): - self.refresh_data(Session, batch, cognizer=cognizer) - self.request.session.flash("Batch data has been refreshed.") - - # TODO: This seems hacky...it exists for (only) one specific scenario. - if not self.request.has_perm('{}.view'.format(permission_prefix)): - return self.redirect(self.request.route_url('{}.create'.format(route_prefix))) - - return self.redirect(self.get_action_url('view', batch)) - - # Showing progress requires a separate thread; start that first. - key = '{}.refresh'.format(self.model_class.__tablename__) - progress = SessionProgress(self.request, key) - # success_url = self.request.route_url('vendors.scangenius.create') if not self.request.user else None - - # TODO: This seems hacky...it exists for (only) one specific scenario. - success_url = None - if not self.request.user: - success_url = self.request.route_url('{}.create'.format(route_prefix)) - - thread = Thread(target=self.refresh_thread, args=(batch.uuid, progress, - cognizer.uuid if cognizer else None, - success_url)) - thread.start() - - # Send user to progress page. - kwargs = { - 'cancel_url': self.get_action_url('view', batch), - 'cancel_msg': "Batch refresh was canceled.", - } - - # TODO: This seems hacky...it exists for (only) one specific scenario. - if not self.request.has_perm('{}.view'.format(permission_prefix)): - kwargs['cancel_url'] = self.request.route_url('{}.create'.format(route_prefix)) - - return self.render_progress(progress, kwargs) - - def refresh_data(self, session, batch, cognizer=None, progress=None): + def refresh_data(self, session, batch, user, progress=None): """ Instruct the batch handler to refresh all data for the batch. """ @@ -821,9 +966,10 @@ class BatchMasterView(MasterView): batch.cognized_by = cognizer or session.merge(self.request.user) else: # the future - self.handler.refresh(batch, progress=progress) + user = user or session.merge(self.request.user) + self.handler.do_refresh(batch, user, progress=progress) - def refresh_thread(self, batch_uuid, progress=None, cognizer_uuid=None, success_url=None): + def refresh_thread(self, batch_uuid, user_uuid, progress, **kwargs): """ Thread target for refreshing batch data with progress indicator. """ @@ -832,9 +978,9 @@ class BatchMasterView(MasterView): # transaction binding etc. session = RattailSession() batch = session.query(self.model_class).get(batch_uuid) - cognizer = session.query(model.User).get(cognizer_uuid) if cognizer_uuid else None + cognizer = session.query(model.User).get(user_uuid) if user_uuid else None try: - self.refresh_data(session, batch, cognizer=cognizer, progress=progress) + self.refresh_data(session, batch, cognizer, progress=progress) except Exception as error: session.rollback() log.warning("refreshing data for batch failed: {}".format(batch), exc_info=True) @@ -854,7 +1000,7 @@ class BatchMasterView(MasterView): if progress: progress.session.load() progress.session['complete'] = True - progress.session['success_url'] = success_url or self.get_action_url('view', batch) + progress.session['success_url'] = self.get_action_url('view', batch) progress.session.save() ######################################## @@ -943,16 +1089,7 @@ class BatchMasterView(MasterView): for key, value in form.validated.items(): self.request.session['batch.{}.execute_option.{}'.format(batch.batch_key, key)] = value - key = '{}.execute'.format(self.model_class.__tablename__) - progress = SessionProgress(self.request, key) - kwargs['progress'] = progress - thread = Thread(target=self.execute_thread, args=(batch.uuid, self.request.user.uuid), kwargs=kwargs) - thread.start() - - return self.render_progress(progress, { - 'cancel_url': self.get_action_url('view', batch), - 'cancel_msg': "Batch execution was canceled.", - }) + return self.handler_action(batch, 'execute', **kwargs) self.request.session.flash("Invalid request: {}".format(form.make_deform_form().error), 'error') return self.redirect(self.get_action_url('view', batch)) @@ -1003,7 +1140,7 @@ class BatchMasterView(MasterView): def execute_error_message(self, error): return "Batch execution failed: {}: {}".format(type(error).__name__, error) - def execute_thread(self, batch_uuid, user_uuid, progress=None, **kwargs): + def execute_thread(self, batch_uuid, user_uuid, progress, **kwargs): """ Thread target for executing a batch with progress indicator. """ @@ -1014,7 +1151,7 @@ class BatchMasterView(MasterView): batch = session.query(self.model_class).get(batch_uuid) user = session.query(model.User).get(user_uuid) try: - result = self.handler.execute(batch, user=user, progress=progress, **kwargs) + result = self.handler.do_execute(batch, user=user, progress=progress, **kwargs) # If anything goes wrong, rollback and log the error etc. except Exception as error: @@ -1030,8 +1167,6 @@ class BatchMasterView(MasterView): # If no error, check result flag (false means user canceled). else: if result: - batch.executed = datetime.datetime.utcnow() - batch.executed_by = user session.commit() # TODO: this doesn't always work...? self.request.session.flash("{} has been executed: {}".format( @@ -1159,7 +1294,7 @@ class BatchMasterView(MasterView): # else the perm group label will not display correctly... config.add_tailbone_permission_group(permission_prefix, model_title_plural, overwrite=False) - # prefill row data + # populate row data config.add_route('{}.prefill'.format(route_prefix), '{}/{{uuid}}/prefill'.format(url_prefix)) config.add_view(cls, attr='prefill', route_name='{}.prefill'.format(route_prefix), permission='{}.create'.format(permission_prefix)) diff --git a/tailbone/views/core.py b/tailbone/views/core.py index 42addd76..daa6fc59 100644 --- a/tailbone/views/core.py +++ b/tailbone/views/core.py @@ -100,6 +100,7 @@ class View(object): if not template: template = '/progress.mako' kwargs['progress'] = progress + kwargs.setdefault('can_cancel', True) return render_to_response(template, kwargs, request=self.request) def file_response(self, path):