diff --git a/tailbone/progress.py b/tailbone/progress.py
index 0879025b..90fa21be 100644
--- a/tailbone/progress.py
+++ b/tailbone/progress.py
@@ -2,7 +2,7 @@
################################################################################
#
# Rattail -- Retail Software Framework
-# Copyright © 2010-2017 Lance Edgar
+# Copyright © 2010-2018 Lance Edgar
#
# This file is part of Rattail.
#
@@ -28,6 +28,8 @@ from __future__ import unicode_literals, absolute_import
import os
+from rattail.progress import ProgressBase
+
from beaker.session import Session
@@ -43,7 +45,7 @@ def get_progress_session(request, key, **kwargs):
return session
-class SessionProgress(object):
+class SessionProgress(ProgressBase):
"""
Provides a session-based progress bar mechanism.
@@ -82,6 +84,3 @@ class SessionProgress(object):
self.session['value'] = value
self.session.save()
return not self.canceled
-
- def destroy(self):
- pass
diff --git a/tailbone/templates/progress.mako b/tailbone/templates/progress.mako
index c378a986..2df195ff 100644
--- a/tailbone/templates/progress.mako
+++ b/tailbone/templates/progress.mako
@@ -20,6 +20,7 @@
updater = setInterval(function() {update_progress()}, 1000);
+ % if can_cancel:
$(function() {
$('#cancel button').click(function() {
@@ -39,6 +40,7 @@
});
});
+ % endif
@@ -60,9 +62,11 @@
|
+ % if can_cancel:
|
+ % endif
@@ -86,10 +90,14 @@
} else if (data.complete || data.maximum) {
$('#message').html(data.message);
$('#total').html('('+data.maximum_display+' total)');
+ % if can_cancel:
$('#cancel button').show();
+ % endif
if (data.complete) {
clearInterval(updater);
+ % if can_cancel:
$('#cancel button').hide();
+ % endif
$('#total').html('done!');
$('#complete').css('width', '100%');
$('#remaining').hide();
diff --git a/tailbone/views/batch/core.py b/tailbone/views/batch/core.py
index d7c3794c..150dd560 100644
--- a/tailbone/views/batch/core.py
+++ b/tailbone/views/batch/core.py
@@ -27,18 +27,24 @@ Base views for maintaining "new-style" batches.
from __future__ import unicode_literals, absolute_import
import os
+import sys
import datetime
import logging
+import socket
+import subprocess
import tempfile
from six import StringIO
+import json
import six
import sqlalchemy as sa
from sqlalchemy import orm
from rattail.db import model, Session as RattailSession
+from rattail.db.util import short_session
from rattail.threads import Thread
from rattail.util import load_object, prettify
+from rattail.progress import SocketProgress
import colander
import deform
@@ -57,6 +63,10 @@ from tailbone.util import csrf_token
log = logging.getLogger(__name__)
+class EverythingComplete(Exception):
+ pass
+
+
class BatchMasterView(MasterView):
"""
Base class for all "batch master" views.
@@ -696,43 +706,225 @@ class BatchMasterView(MasterView):
return self.handler.get_execute_title(batch)
return "Execute Batch"
+ def handler_action(self, batch, action, **kwargs):
+ """
+ View which will attempt to refresh all data for the batch. What
+ exactly this means will depend on the type of batch etc.
+ """
+ route_prefix = self.get_route_prefix()
+ permission_prefix = self.get_permission_prefix()
+
+ user = self.request.user
+ user_uuid = user.uuid if user else None
+ username = user.username if user else None
+
+ key = '{}.{}'.format(self.model_class.__tablename__, action)
+ progress = SessionProgress(self.request, key)
+
+ # must ensure versioning is *disabled* during action, if handler says so
+ allow_versioning = self.handler.allow_versioning(action)
+ if not allow_versioning and self.rattail_config.versioning_enabled():
+ can_cancel = False
+
+ # make socket for progress thread to listen to action thread
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.bind(('127.0.0.1', 0))
+ sock.listen(1)
+ port = sock.getsockname()[1]
+
+ # launch thread to monitor progress
+ success_url = self.get_action_url('view', batch)
+ thread = Thread(target=self.progress_thread, args=(sock, success_url, progress))
+ thread.start()
+
+ # launch thread to invoke handler action
+ thread = Thread(target=self.action_subprocess_thread, args=(batch.uuid, port, username, action, progress))
+ thread.start()
+
+ else: # either versioning is disabled, or handler doesn't mind
+ can_cancel = True
+
+ # launch thread to populate batch; that will update session progress directly
+ target = getattr(self, '{}_thread'.format(action))
+ thread = Thread(target=target, args=(batch.uuid, user_uuid, progress), kwargs=kwargs)
+ thread.start()
+
+ return self.render_progress(progress, {
+ 'can_cancel': can_cancel,
+ 'cancel_url': self.get_action_url('view', batch),
+ 'cancel_msg': "{} of batch was canceled.".format(action.capitalize()),
+ })
+
+ def progress_thread(self, sock, success_url, progress):
+ """
+ This method is meant to be used as a thread target. Its job is to read
+ progress data from ``connection`` and update the session progress
+ accordingly. When a final "process complete" indication is read, the
+ socket will be closed and the thread will end.
+ """
+ while True:
+ try:
+ self.process_progress(sock, progress)
+ except EverythingComplete:
+ break
+
+ # close server socket
+ sock.close()
+
+ # finalize session progress
+ progress.session.load()
+ progress.session['complete'] = True
+ if callable(success_url):
+ success_url = success_url()
+ progress.session['success_url'] = success_url
+ progress.session.save()
+
+ def process_progress(self, sock, progress):
+ """
+ This method will accept a client connection on the given socket, and
+ then update the given progress object according to data written by the
+ client.
+ """
+ connection, client_address = sock.accept()
+ active_progress = None
+
+ # TODO: make this configurable?
+ suffix = "\n\n.".encode('utf_8')
+ data = b''
+
+ # listen for progress info, update session progress as needed
+ while True:
+
+ # accumulate data bytestring until we see the suffix
+ byte = connection.recv(1)
+ data += byte
+ if data.endswith(suffix):
+
+ # strip suffix, interpret data as JSON
+ data = data[:-len(suffix)]
+ data = json.loads(data)
+
+ if data.get('everything_complete'):
+ if active_progress:
+ active_progress.finish()
+ raise EverythingComplete
+
+ elif data.get('process_complete'):
+ active_progress.finish()
+ active_progress = None
+ break
+
+ elif 'value' in data:
+ if not active_progress:
+ active_progress = progress(data['message'], data['maximum'])
+ active_progress.update(data['value'])
+
+ # reset data buffer
+ data = b''
+
+ # close client connection
+ connection.close()
+
+ def launch_subprocess(self, port=None, username=None,
+ command='rattail', command_args=None,
+ subcommand=None, subcommand_args=None):
+
+ # construct command
+ cmd = [os.path.join(sys.prefix, 'bin/{}'.format(command))]
+ for path in self.rattail_config.files_read:
+ cmd.extend(['--config', path])
+ if username:
+ cmd.extend(['--runas', username])
+ if command_args:
+ cmd.extend(command_args)
+ cmd.extend([
+ '--progress',
+ '--progress-socket', '127.0.0.1:{}'.format(port),
+ subcommand,
+ ])
+ if subcommand_args:
+ cmd.extend(subcommand_args)
+
+ # run command in subprocess
+ subprocess.check_call(cmd)
+
+ def action_subprocess_thread(self, batch_uuid, port, username, action, progress):
+ """
+ This method is sort of an alternative thread target for batch actions,
+ to be used in the event versioning is enabled for the main process but
+ the handler says versioning must be avoided during the action. It must
+ launch a separate process with versioning disabled in order to act on
+ the batch.
+ """
+ # invoke command to act on batch in separate process
+ try:
+ self.launch_subprocess(port=port, username=username,
+ command='rattail',
+ command_args=[
+ '--no-versioning',
+ ],
+ subcommand='{}-batch'.format(action),
+ subcommand_args=[
+ self.handler.batch_key,
+ batch_uuid,
+ ])
+ except Exception as error:
+ log.warning("%s of '%s' batch failed: %s", action, self.handler.batch_key, batch_uuid, exc_info=True)
+
+ # TODO: write error info to socket
+
+ # if progress:
+ # progress.session.load()
+ # progress.session['error'] = True
+ # progress.session['error_msg'] = "Batch population failed: {} - {}".format(error.__class__.__name__, error)
+ # progress.session.save()
+
+ return
+
+ models = getattr(self.handler, 'version_catchup_{}'.format(action), None)
+ if models:
+ self.catchup_versions(port, batch_uuid, username, *models)
+
+ suffix = "\n\n.".encode('utf_8')
+ cxn = socket.create_connection(('127.0.0.1', port))
+ cxn.send(json.dumps({
+ 'everything_complete': True,
+ }))
+ cxn.send(suffix)
+ cxn.close()
+
+ def catchup_versions(self, port, batch_uuid, username, *models):
+ with short_session() as s:
+ batch = s.query(self.model_class).get(batch_uuid)
+ batch_id = batch.id_str
+ description = six.text_type(batch)
+
+ self.launch_subprocess(
+ port=port, username=username,
+ command='rattail',
+ subcommand='import-versions',
+ subcommand_args=[
+ '--comment',
+ "version catch-up for '{}' batch {}: {}".format(self.handler.batch_key, batch_id, description),
+ ] + list(models))
+
def prefill(self):
"""
View which will attempt to prefill all data for the batch. What
exactly this means will depend on the type of batch etc.
"""
batch = self.get_instance()
- route_prefix = self.get_route_prefix()
- permission_prefix = self.get_permission_prefix()
+ return self.handler_action(batch, 'populate')
- # showing progress requires a separate thread; start that first
- key = '{}.prefill'.format(route_prefix)
- progress = SessionProgress(self.request, key)
- thread = Thread(target=self.prefill_thread, args=(batch.uuid, progress))
- thread.start()
-
- # Send user to progress page.
- kwargs = {
- 'cancel_url': self.get_action_url('view', batch),
- 'cancel_msg': "Batch prefill was canceled.",
- }
-
- # TODO: This seems hacky...it exists for (only) one specific scenario.
- if not self.request.has_perm('{}.view'.format(permission_prefix)):
- kwargs['cancel_url'] = self.request.route_url('{}.create'.format(route_prefix))
-
- return self.render_progress(progress, kwargs)
-
- def prefill_thread(self, batch_uuid, progress):
+ def populate_thread(self, batch_uuid, user_uuid, progress, **kwargs):
"""
- Thread target for prefilling batch data with progress indicator.
+ Thread target for populating batch data with progress indicator.
"""
# mustn't use tailbone web session here
session = RattailSession()
batch = session.query(self.model_class).get(batch_uuid)
try:
- self.handler.populate(batch, progress=progress)
- self.handler.refresh_batch_status(batch)
+ self.handler.do_populate(batch, progress=progress)
except Exception as error:
session.rollback()
log.warning("batch population failed: %s", batch, exc_info=True)
@@ -761,56 +953,9 @@ class BatchMasterView(MasterView):
exactly this means will depend on the type of batch etc.
"""
batch = self.get_instance()
- route_prefix = self.get_route_prefix()
- permission_prefix = self.get_permission_prefix()
+ return self.handler_action(batch, 'refresh')
- # TODO: deprecate / remove this
- cognizer = self.request.user
- if not cognizer:
- uuid = self.request.session.pop('late_login_user', None)
- cognizer = Session.query(model.User).get(uuid) if uuid else None
-
- # TODO: refresh should probably always imply/use progress
- # If handler doesn't declare the need for progress indicator, things
- # are nice and simple.
- if not getattr(self.handler, 'show_progress', True):
- self.refresh_data(Session, batch, cognizer=cognizer)
- self.request.session.flash("Batch data has been refreshed.")
-
- # TODO: This seems hacky...it exists for (only) one specific scenario.
- if not self.request.has_perm('{}.view'.format(permission_prefix)):
- return self.redirect(self.request.route_url('{}.create'.format(route_prefix)))
-
- return self.redirect(self.get_action_url('view', batch))
-
- # Showing progress requires a separate thread; start that first.
- key = '{}.refresh'.format(self.model_class.__tablename__)
- progress = SessionProgress(self.request, key)
- # success_url = self.request.route_url('vendors.scangenius.create') if not self.request.user else None
-
- # TODO: This seems hacky...it exists for (only) one specific scenario.
- success_url = None
- if not self.request.user:
- success_url = self.request.route_url('{}.create'.format(route_prefix))
-
- thread = Thread(target=self.refresh_thread, args=(batch.uuid, progress,
- cognizer.uuid if cognizer else None,
- success_url))
- thread.start()
-
- # Send user to progress page.
- kwargs = {
- 'cancel_url': self.get_action_url('view', batch),
- 'cancel_msg': "Batch refresh was canceled.",
- }
-
- # TODO: This seems hacky...it exists for (only) one specific scenario.
- if not self.request.has_perm('{}.view'.format(permission_prefix)):
- kwargs['cancel_url'] = self.request.route_url('{}.create'.format(route_prefix))
-
- return self.render_progress(progress, kwargs)
-
- def refresh_data(self, session, batch, cognizer=None, progress=None):
+ def refresh_data(self, session, batch, user, progress=None):
"""
Instruct the batch handler to refresh all data for the batch.
"""
@@ -821,9 +966,10 @@ class BatchMasterView(MasterView):
batch.cognized_by = cognizer or session.merge(self.request.user)
else: # the future
- self.handler.refresh(batch, progress=progress)
+ user = user or session.merge(self.request.user)
+ self.handler.do_refresh(batch, user, progress=progress)
- def refresh_thread(self, batch_uuid, progress=None, cognizer_uuid=None, success_url=None):
+ def refresh_thread(self, batch_uuid, user_uuid, progress, **kwargs):
"""
Thread target for refreshing batch data with progress indicator.
"""
@@ -832,9 +978,9 @@ class BatchMasterView(MasterView):
# transaction binding etc.
session = RattailSession()
batch = session.query(self.model_class).get(batch_uuid)
- cognizer = session.query(model.User).get(cognizer_uuid) if cognizer_uuid else None
+ cognizer = session.query(model.User).get(user_uuid) if user_uuid else None
try:
- self.refresh_data(session, batch, cognizer=cognizer, progress=progress)
+ self.refresh_data(session, batch, cognizer, progress=progress)
except Exception as error:
session.rollback()
log.warning("refreshing data for batch failed: {}".format(batch), exc_info=True)
@@ -854,7 +1000,7 @@ class BatchMasterView(MasterView):
if progress:
progress.session.load()
progress.session['complete'] = True
- progress.session['success_url'] = success_url or self.get_action_url('view', batch)
+ progress.session['success_url'] = self.get_action_url('view', batch)
progress.session.save()
########################################
@@ -943,16 +1089,7 @@ class BatchMasterView(MasterView):
for key, value in form.validated.items():
self.request.session['batch.{}.execute_option.{}'.format(batch.batch_key, key)] = value
- key = '{}.execute'.format(self.model_class.__tablename__)
- progress = SessionProgress(self.request, key)
- kwargs['progress'] = progress
- thread = Thread(target=self.execute_thread, args=(batch.uuid, self.request.user.uuid), kwargs=kwargs)
- thread.start()
-
- return self.render_progress(progress, {
- 'cancel_url': self.get_action_url('view', batch),
- 'cancel_msg': "Batch execution was canceled.",
- })
+ return self.handler_action(batch, 'execute', **kwargs)
self.request.session.flash("Invalid request: {}".format(form.make_deform_form().error), 'error')
return self.redirect(self.get_action_url('view', batch))
@@ -1003,7 +1140,7 @@ class BatchMasterView(MasterView):
def execute_error_message(self, error):
return "Batch execution failed: {}: {}".format(type(error).__name__, error)
- def execute_thread(self, batch_uuid, user_uuid, progress=None, **kwargs):
+ def execute_thread(self, batch_uuid, user_uuid, progress, **kwargs):
"""
Thread target for executing a batch with progress indicator.
"""
@@ -1014,7 +1151,7 @@ class BatchMasterView(MasterView):
batch = session.query(self.model_class).get(batch_uuid)
user = session.query(model.User).get(user_uuid)
try:
- result = self.handler.execute(batch, user=user, progress=progress, **kwargs)
+ result = self.handler.do_execute(batch, user=user, progress=progress, **kwargs)
# If anything goes wrong, rollback and log the error etc.
except Exception as error:
@@ -1030,8 +1167,6 @@ class BatchMasterView(MasterView):
# If no error, check result flag (false means user canceled).
else:
if result:
- batch.executed = datetime.datetime.utcnow()
- batch.executed_by = user
session.commit()
# TODO: this doesn't always work...?
self.request.session.flash("{} has been executed: {}".format(
@@ -1159,7 +1294,7 @@ class BatchMasterView(MasterView):
# else the perm group label will not display correctly...
config.add_tailbone_permission_group(permission_prefix, model_title_plural, overwrite=False)
- # prefill row data
+ # populate row data
config.add_route('{}.prefill'.format(route_prefix), '{}/{{uuid}}/prefill'.format(url_prefix))
config.add_view(cls, attr='prefill', route_name='{}.prefill'.format(route_prefix),
permission='{}.create'.format(permission_prefix))
diff --git a/tailbone/views/core.py b/tailbone/views/core.py
index 42addd76..daa6fc59 100644
--- a/tailbone/views/core.py
+++ b/tailbone/views/core.py
@@ -100,6 +100,7 @@ class View(object):
if not template:
template = '/progress.mako'
kwargs['progress'] = progress
+ kwargs.setdefault('can_cancel', True)
return render_to_response(template, kwargs, request=self.request)
def file_response(self, path):