Add versioning workaround support for batch actions

* add `can_cancel` flag for progress page, hide button if set
* overhaul populate/refresh/execute to launch socket/subprocess if necessary
This commit is contained in:
Lance Edgar 2018-06-07 12:40:25 -05:00
parent cc6fa7058b
commit 44ff02b7af
4 changed files with 241 additions and 98 deletions

View file

@ -2,7 +2,7 @@
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2017 Lance Edgar
# Copyright © 2010-2018 Lance Edgar
#
# This file is part of Rattail.
#
@ -28,6 +28,8 @@ from __future__ import unicode_literals, absolute_import
import os
from rattail.progress import ProgressBase
from beaker.session import Session
@ -43,7 +45,7 @@ def get_progress_session(request, key, **kwargs):
return session
class SessionProgress(object):
class SessionProgress(ProgressBase):
"""
Provides a session-based progress bar mechanism.
@ -82,6 +84,3 @@ class SessionProgress(object):
self.session['value'] = value
self.session.save()
return not self.canceled
def destroy(self):
pass

View file

@ -20,6 +20,7 @@
updater = setInterval(function() {update_progress()}, 1000);
% if can_cancel:
$(function() {
$('#cancel button').click(function() {
@ -39,6 +40,7 @@
});
});
% endif
</script>
</head>
@ -60,9 +62,11 @@
</table><!-- #progress -->
</td>
<td id="percentage"></td>
% if can_cancel:
<td id="cancel">
<button type="button" style="display: none;">Cancel</button>
</td>
% endif
</tr>
</table><!-- #progress-wrapper -->
@ -86,10 +90,14 @@
} else if (data.complete || data.maximum) {
$('#message').html(data.message);
$('#total').html('('+data.maximum_display+' total)');
% if can_cancel:
$('#cancel button').show();
% endif
if (data.complete) {
clearInterval(updater);
% if can_cancel:
$('#cancel button').hide();
% endif
$('#total').html('done!');
$('#complete').css('width', '100%');
$('#remaining').hide();

View file

@ -27,18 +27,24 @@ Base views for maintaining "new-style" batches.
from __future__ import unicode_literals, absolute_import
import os
import sys
import datetime
import logging
import socket
import subprocess
import tempfile
from six import StringIO
import json
import six
import sqlalchemy as sa
from sqlalchemy import orm
from rattail.db import model, Session as RattailSession
from rattail.db.util import short_session
from rattail.threads import Thread
from rattail.util import load_object, prettify
from rattail.progress import SocketProgress
import colander
import deform
@ -57,6 +63,10 @@ from tailbone.util import csrf_token
log = logging.getLogger(__name__)
class EverythingComplete(Exception):
pass
class BatchMasterView(MasterView):
"""
Base class for all "batch master" views.
@ -696,43 +706,225 @@ class BatchMasterView(MasterView):
return self.handler.get_execute_title(batch)
return "Execute Batch"
def handler_action(self, batch, action, **kwargs):
"""
View which will attempt to refresh all data for the batch. What
exactly this means will depend on the type of batch etc.
"""
route_prefix = self.get_route_prefix()
permission_prefix = self.get_permission_prefix()
user = self.request.user
user_uuid = user.uuid if user else None
username = user.username if user else None
key = '{}.{}'.format(self.model_class.__tablename__, action)
progress = SessionProgress(self.request, key)
# must ensure versioning is *disabled* during action, if handler says so
allow_versioning = self.handler.allow_versioning(action)
if not allow_versioning and self.rattail_config.versioning_enabled():
can_cancel = False
# make socket for progress thread to listen to action thread
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('127.0.0.1', 0))
sock.listen(1)
port = sock.getsockname()[1]
# launch thread to monitor progress
success_url = self.get_action_url('view', batch)
thread = Thread(target=self.progress_thread, args=(sock, success_url, progress))
thread.start()
# launch thread to invoke handler action
thread = Thread(target=self.action_subprocess_thread, args=(batch.uuid, port, username, action, progress))
thread.start()
else: # either versioning is disabled, or handler doesn't mind
can_cancel = True
# launch thread to populate batch; that will update session progress directly
target = getattr(self, '{}_thread'.format(action))
thread = Thread(target=target, args=(batch.uuid, user_uuid, progress), kwargs=kwargs)
thread.start()
return self.render_progress(progress, {
'can_cancel': can_cancel,
'cancel_url': self.get_action_url('view', batch),
'cancel_msg': "{} of batch was canceled.".format(action.capitalize()),
})
def progress_thread(self, sock, success_url, progress):
"""
This method is meant to be used as a thread target. Its job is to read
progress data from ``connection`` and update the session progress
accordingly. When a final "process complete" indication is read, the
socket will be closed and the thread will end.
"""
while True:
try:
self.process_progress(sock, progress)
except EverythingComplete:
break
# close server socket
sock.close()
# finalize session progress
progress.session.load()
progress.session['complete'] = True
if callable(success_url):
success_url = success_url()
progress.session['success_url'] = success_url
progress.session.save()
def process_progress(self, sock, progress):
"""
This method will accept a client connection on the given socket, and
then update the given progress object according to data written by the
client.
"""
connection, client_address = sock.accept()
active_progress = None
# TODO: make this configurable?
suffix = "\n\n.".encode('utf_8')
data = b''
# listen for progress info, update session progress as needed
while True:
# accumulate data bytestring until we see the suffix
byte = connection.recv(1)
data += byte
if data.endswith(suffix):
# strip suffix, interpret data as JSON
data = data[:-len(suffix)]
data = json.loads(data)
if data.get('everything_complete'):
if active_progress:
active_progress.finish()
raise EverythingComplete
elif data.get('process_complete'):
active_progress.finish()
active_progress = None
break
elif 'value' in data:
if not active_progress:
active_progress = progress(data['message'], data['maximum'])
active_progress.update(data['value'])
# reset data buffer
data = b''
# close client connection
connection.close()
def launch_subprocess(self, port=None, username=None,
command='rattail', command_args=None,
subcommand=None, subcommand_args=None):
# construct command
cmd = [os.path.join(sys.prefix, 'bin/{}'.format(command))]
for path in self.rattail_config.files_read:
cmd.extend(['--config', path])
if username:
cmd.extend(['--runas', username])
if command_args:
cmd.extend(command_args)
cmd.extend([
'--progress',
'--progress-socket', '127.0.0.1:{}'.format(port),
subcommand,
])
if subcommand_args:
cmd.extend(subcommand_args)
# run command in subprocess
subprocess.check_call(cmd)
def action_subprocess_thread(self, batch_uuid, port, username, action, progress):
"""
This method is sort of an alternative thread target for batch actions,
to be used in the event versioning is enabled for the main process but
the handler says versioning must be avoided during the action. It must
launch a separate process with versioning disabled in order to act on
the batch.
"""
# invoke command to act on batch in separate process
try:
self.launch_subprocess(port=port, username=username,
command='rattail',
command_args=[
'--no-versioning',
],
subcommand='{}-batch'.format(action),
subcommand_args=[
self.handler.batch_key,
batch_uuid,
])
except Exception as error:
log.warning("%s of '%s' batch failed: %s", action, self.handler.batch_key, batch_uuid, exc_info=True)
# TODO: write error info to socket
# if progress:
# progress.session.load()
# progress.session['error'] = True
# progress.session['error_msg'] = "Batch population failed: {} - {}".format(error.__class__.__name__, error)
# progress.session.save()
return
models = getattr(self.handler, 'version_catchup_{}'.format(action), None)
if models:
self.catchup_versions(port, batch_uuid, username, *models)
suffix = "\n\n.".encode('utf_8')
cxn = socket.create_connection(('127.0.0.1', port))
cxn.send(json.dumps({
'everything_complete': True,
}))
cxn.send(suffix)
cxn.close()
def catchup_versions(self, port, batch_uuid, username, *models):
with short_session() as s:
batch = s.query(self.model_class).get(batch_uuid)
batch_id = batch.id_str
description = six.text_type(batch)
self.launch_subprocess(
port=port, username=username,
command='rattail',
subcommand='import-versions',
subcommand_args=[
'--comment',
"version catch-up for '{}' batch {}: {}".format(self.handler.batch_key, batch_id, description),
] + list(models))
def prefill(self):
"""
View which will attempt to prefill all data for the batch. What
exactly this means will depend on the type of batch etc.
"""
batch = self.get_instance()
route_prefix = self.get_route_prefix()
permission_prefix = self.get_permission_prefix()
return self.handler_action(batch, 'populate')
# showing progress requires a separate thread; start that first
key = '{}.prefill'.format(route_prefix)
progress = SessionProgress(self.request, key)
thread = Thread(target=self.prefill_thread, args=(batch.uuid, progress))
thread.start()
# Send user to progress page.
kwargs = {
'cancel_url': self.get_action_url('view', batch),
'cancel_msg': "Batch prefill was canceled.",
}
# TODO: This seems hacky...it exists for (only) one specific scenario.
if not self.request.has_perm('{}.view'.format(permission_prefix)):
kwargs['cancel_url'] = self.request.route_url('{}.create'.format(route_prefix))
return self.render_progress(progress, kwargs)
def prefill_thread(self, batch_uuid, progress):
def populate_thread(self, batch_uuid, user_uuid, progress, **kwargs):
"""
Thread target for prefilling batch data with progress indicator.
Thread target for populating batch data with progress indicator.
"""
# mustn't use tailbone web session here
session = RattailSession()
batch = session.query(self.model_class).get(batch_uuid)
try:
self.handler.populate(batch, progress=progress)
self.handler.refresh_batch_status(batch)
self.handler.do_populate(batch, progress=progress)
except Exception as error:
session.rollback()
log.warning("batch population failed: %s", batch, exc_info=True)
@ -761,56 +953,9 @@ class BatchMasterView(MasterView):
exactly this means will depend on the type of batch etc.
"""
batch = self.get_instance()
route_prefix = self.get_route_prefix()
permission_prefix = self.get_permission_prefix()
return self.handler_action(batch, 'refresh')
# TODO: deprecate / remove this
cognizer = self.request.user
if not cognizer:
uuid = self.request.session.pop('late_login_user', None)
cognizer = Session.query(model.User).get(uuid) if uuid else None
# TODO: refresh should probably always imply/use progress
# If handler doesn't declare the need for progress indicator, things
# are nice and simple.
if not getattr(self.handler, 'show_progress', True):
self.refresh_data(Session, batch, cognizer=cognizer)
self.request.session.flash("Batch data has been refreshed.")
# TODO: This seems hacky...it exists for (only) one specific scenario.
if not self.request.has_perm('{}.view'.format(permission_prefix)):
return self.redirect(self.request.route_url('{}.create'.format(route_prefix)))
return self.redirect(self.get_action_url('view', batch))
# Showing progress requires a separate thread; start that first.
key = '{}.refresh'.format(self.model_class.__tablename__)
progress = SessionProgress(self.request, key)
# success_url = self.request.route_url('vendors.scangenius.create') if not self.request.user else None
# TODO: This seems hacky...it exists for (only) one specific scenario.
success_url = None
if not self.request.user:
success_url = self.request.route_url('{}.create'.format(route_prefix))
thread = Thread(target=self.refresh_thread, args=(batch.uuid, progress,
cognizer.uuid if cognizer else None,
success_url))
thread.start()
# Send user to progress page.
kwargs = {
'cancel_url': self.get_action_url('view', batch),
'cancel_msg': "Batch refresh was canceled.",
}
# TODO: This seems hacky...it exists for (only) one specific scenario.
if not self.request.has_perm('{}.view'.format(permission_prefix)):
kwargs['cancel_url'] = self.request.route_url('{}.create'.format(route_prefix))
return self.render_progress(progress, kwargs)
def refresh_data(self, session, batch, cognizer=None, progress=None):
def refresh_data(self, session, batch, user, progress=None):
"""
Instruct the batch handler to refresh all data for the batch.
"""
@ -821,9 +966,10 @@ class BatchMasterView(MasterView):
batch.cognized_by = cognizer or session.merge(self.request.user)
else: # the future
self.handler.refresh(batch, progress=progress)
user = user or session.merge(self.request.user)
self.handler.do_refresh(batch, user, progress=progress)
def refresh_thread(self, batch_uuid, progress=None, cognizer_uuid=None, success_url=None):
def refresh_thread(self, batch_uuid, user_uuid, progress, **kwargs):
"""
Thread target for refreshing batch data with progress indicator.
"""
@ -832,9 +978,9 @@ class BatchMasterView(MasterView):
# transaction binding etc.
session = RattailSession()
batch = session.query(self.model_class).get(batch_uuid)
cognizer = session.query(model.User).get(cognizer_uuid) if cognizer_uuid else None
cognizer = session.query(model.User).get(user_uuid) if user_uuid else None
try:
self.refresh_data(session, batch, cognizer=cognizer, progress=progress)
self.refresh_data(session, batch, cognizer, progress=progress)
except Exception as error:
session.rollback()
log.warning("refreshing data for batch failed: {}".format(batch), exc_info=True)
@ -854,7 +1000,7 @@ class BatchMasterView(MasterView):
if progress:
progress.session.load()
progress.session['complete'] = True
progress.session['success_url'] = success_url or self.get_action_url('view', batch)
progress.session['success_url'] = self.get_action_url('view', batch)
progress.session.save()
########################################
@ -943,16 +1089,7 @@ class BatchMasterView(MasterView):
for key, value in form.validated.items():
self.request.session['batch.{}.execute_option.{}'.format(batch.batch_key, key)] = value
key = '{}.execute'.format(self.model_class.__tablename__)
progress = SessionProgress(self.request, key)
kwargs['progress'] = progress
thread = Thread(target=self.execute_thread, args=(batch.uuid, self.request.user.uuid), kwargs=kwargs)
thread.start()
return self.render_progress(progress, {
'cancel_url': self.get_action_url('view', batch),
'cancel_msg': "Batch execution was canceled.",
})
return self.handler_action(batch, 'execute', **kwargs)
self.request.session.flash("Invalid request: {}".format(form.make_deform_form().error), 'error')
return self.redirect(self.get_action_url('view', batch))
@ -1003,7 +1140,7 @@ class BatchMasterView(MasterView):
def execute_error_message(self, error):
return "Batch execution failed: {}: {}".format(type(error).__name__, error)
def execute_thread(self, batch_uuid, user_uuid, progress=None, **kwargs):
def execute_thread(self, batch_uuid, user_uuid, progress, **kwargs):
"""
Thread target for executing a batch with progress indicator.
"""
@ -1014,7 +1151,7 @@ class BatchMasterView(MasterView):
batch = session.query(self.model_class).get(batch_uuid)
user = session.query(model.User).get(user_uuid)
try:
result = self.handler.execute(batch, user=user, progress=progress, **kwargs)
result = self.handler.do_execute(batch, user=user, progress=progress, **kwargs)
# If anything goes wrong, rollback and log the error etc.
except Exception as error:
@ -1030,8 +1167,6 @@ class BatchMasterView(MasterView):
# If no error, check result flag (false means user canceled).
else:
if result:
batch.executed = datetime.datetime.utcnow()
batch.executed_by = user
session.commit()
# TODO: this doesn't always work...?
self.request.session.flash("{} has been executed: {}".format(
@ -1159,7 +1294,7 @@ class BatchMasterView(MasterView):
# else the perm group label will not display correctly...
config.add_tailbone_permission_group(permission_prefix, model_title_plural, overwrite=False)
# prefill row data
# populate row data
config.add_route('{}.prefill'.format(route_prefix), '{}/{{uuid}}/prefill'.format(url_prefix))
config.add_view(cls, attr='prefill', route_name='{}.prefill'.format(route_prefix),
permission='{}.create'.format(permission_prefix))

View file

@ -100,6 +100,7 @@ class View(object):
if not template:
template = '/progress.mako'
kwargs['progress'] = progress
kwargs.setdefault('can_cancel', True)
return render_to_response(template, kwargs, request=self.request)
def file_response(self, path):