appy.gen: workflows are now implemented as full Appy worlflows. Plone (DC) workflow are not generated anymore. From the Appy user point of view (=developer), no change has occurred: it is a pure implementation concern. This is one more step towards Appy independence from Plone.

This commit is contained in:
Gaetan Delannay 2011-07-26 22:15:04 +02:00
parent 93eb16670b
commit ddec7cd62c
14 changed files with 378 additions and 806 deletions

View file

@ -598,122 +598,4 @@ class TranslationClassDescriptor(ClassDescriptor):
params['format'] = String.TEXT
params['height'] = height
self.addField(messageId, String(**params))
class WorkflowDescriptor(appy.gen.descriptors.WorkflowDescriptor):
'''Represents a workflow.'''
# How to map Appy permissions to Plone permissions ?
appyToPlonePermissions = {
'read': ('View', 'Access contents information'),
'write': ('Modify portal content',),
'delete': ('Delete objects',),
}
def getPlonePermissions(self, permission):
'''Returns the Plone permission(s) that correspond to
Appy p_permission.'''
if self.appyToPlonePermissions.has_key(permission):
res = self.appyToPlonePermissions[permission]
elif isinstance(permission, basestring):
res = [permission]
else:
# Permission if an Appy permission declaration
className, fieldName = permission.fieldDescriptor.rsplit('.', 1)
if className.find('.') == -1:
# The related class resides in the same module as the workflow
fullClassName = '%s_%s' % (
self.klass.__module__.replace('.', '_'), className)
else:
# className contains the full package name of the class
fullClassName = className.replace('.', '_')
# Read or Write ?
if permission.__class__.__name__ == 'ReadPermission':
access = 'Read'
else:
access = 'Write'
permName = '%s: %s %s %s' % (self.generator.applicationName,
access, fullClassName, fieldName)
res = [permName]
return res
def getWorkflowName(klass):
'''Generates the name of the corresponding Archetypes workflow.'''
res = klass.__module__.replace('.', '_') + '_' + klass.__name__
return res.lower()
getWorkflowName = staticmethod(getWorkflowName)
def getStatesInfo(self, asDumpableCode=False):
'''Gets, in a dict, information for configuring states of the workflow.
If p_asDumpableCode is True, instead of returning a dict, this
method will return a string containing the dict that can be dumped
into a Python code file.'''
res = {}
transitions = self.getTransitions()
for state in self.getStates():
stateName = self.getNameOf(state)
# We need the list of transitions that start from this state
outTransitions = state.getTransitions(transitions,
selfIsFromState=True)
tNames = self.getTransitionNames(outTransitions,
limitToFromState=state)
# Compute the permissions/roles mapping for this state
permissionsMapping = {}
for permission, roles in state.getPermissions().iteritems():
for plonePerm in self.getPlonePermissions(permission):
permissionsMapping[plonePerm] = [r.name for r in roles]
# Add 'Review portal content' to anyone; this is not a security
# problem because we limit the triggering of every transition
# individually.
allRoles = [r.name for r in self.generator.getAllUsedRoles()]
if 'Manager' not in allRoles: allRoles.append('Manager')
permissionsMapping['Review portal content'] = allRoles
res[stateName] = (tNames, permissionsMapping)
if not asDumpableCode:
return res
# We must create the "Python code" version of this dict
newRes = '{'
for stateName, stateInfo in res.iteritems():
transitions = ','.join(['"%s"' % tn for tn in stateInfo[0]])
# Compute permissions
permissions = ''
for perm, roles in stateInfo[1].iteritems():
theRoles = ','.join(['"%s"' % r for r in roles])
permissions += '"%s": [%s],' % (perm, theRoles)
newRes += '\n "%s": ([%s], {%s}),' % \
(stateName, transitions, permissions)
return newRes + '}'
def getTransitionsInfo(self, asDumpableCode=False):
'''Gets, in a dict, information for configuring transitions of the
workflow. If p_asDumpableCode is True, instead of returning a dict,
this method will return a string containing the dict that can be
dumped into a Python code file.'''
res = {}
for tName in self.getTransitionNames():
res[tName] = self.getEndStateName(tName)
if not asDumpableCode:
return res
# We must create the "Python code" version of this dict
newRes = '{'
for transitionName, endStateName in res.iteritems():
newRes += '\n "%s": "%s",' % (transitionName, endStateName)
return newRes + '}'
def getManagedPermissions(self):
'''Returns the Plone permissions of all Appy permissions managed by this
workflow.'''
res = set()
res.add('Review portal content')
for state in self.getStates():
for permission in state.permissions.iterkeys():
for plonePerm in self.getPlonePermissions(permission):
res.add(plonePerm)
return res
def getScripts(self):
res = ''
wfName = WorkflowDescriptor.getWorkflowName(self.klass)
for tName in self.getTransitionNames():
scriptName = '%s_do%s%s' % (wfName, tName[0].upper(), tName[1:])
res += 'def %s(self, stateChange, **kw): do("%s", ' \
'stateChange, logger)\n' % (scriptName, tName)
return res
# ------------------------------------------------------------------------------

View file

@ -8,9 +8,9 @@ from appy.gen import *
from appy.gen.po import PoMessage, PoFile, PoParser
from appy.gen.generator import Generator as AbstractGenerator
from appy.gen.utils import getClassName
from descriptors import ClassDescriptor, WorkflowDescriptor, \
ToolClassDescriptor, UserClassDescriptor, \
TranslationClassDescriptor
from appy.gen.descriptors import WorkflowDescriptor
from descriptors import ClassDescriptor, ToolClassDescriptor, \
UserClassDescriptor, TranslationClassDescriptor
from model import ModelClass, User, Tool, Translation
# Common methods that need to be defined on every Archetype class --------------
@ -38,7 +38,6 @@ class Generator(AbstractGenerator):
AbstractGenerator.__init__(self, *args, **kwargs)
# Set our own Descriptor classes
self.descriptorClasses['class'] = ClassDescriptor
self.descriptorClasses['workflow'] = WorkflowDescriptor
# Create our own Tool, User and Translation instances
self.tool = ToolClassDescriptor(Tool, self)
self.user = UserClassDescriptor(User, self)
@ -159,7 +158,6 @@ class Generator(AbstractGenerator):
# Create basic files (config.py, Install.py, etc)
self.generateTool()
self.generateInit()
self.generateWorkflows()
self.generateTests()
if self.config.frontPage:
self.generateFrontPage()
@ -368,33 +366,6 @@ class Generator(AbstractGenerator):
catalogMap += "catalogMap['%s']['black'] = " \
"['portal_catalog']\n" % blackClass
repls['catalogMap'] = catalogMap
# Compute workflows
workflows = ''
for classDescr in classesAll:
if hasattr(classDescr.klass, 'workflow'):
wfName = WorkflowDescriptor.getWorkflowName(
classDescr.klass.workflow)
workflows += '\n "%s":"%s",' % (classDescr.name, wfName)
repls['workflows'] = workflows
# Compute workflow instances initialisation
wfInit = ''
for workflowDescr in self.workflows:
k = workflowDescr.klass
className = '%s.%s' % (k.__module__, k.__name__)
wfInit += 'wf = %s()\n' % className
wfInit += 'wf._transitionsMapping = {}\n'
for transition in workflowDescr.getTransitions():
tName = workflowDescr.getNameOf(transition)
tNames = workflowDescr.getTransitionNamesOf(tName, transition)
for trName in tNames:
wfInit += 'wf._transitionsMapping["%s"] = wf.%s\n' % \
(trName, tName)
# We need a new attribute that stores states in order
wfInit += 'wf._states = []\n'
for stateName in workflowDescr.getStateNames(ordered=True):
wfInit += 'wf._states.append("%s")\n' % stateName
wfInit += 'workflowInstances[%s] = wf\n' % className
repls['workflowInstancesInit'] = wfInit
# Compute the list of ordered attributes (forward and backward,
# inherited included) for every Appy class.
attributes = []
@ -463,40 +434,6 @@ class Generator(AbstractGenerator):
repls['totalNumberOfTests'] = self.totalNumberOfTests
self.copyFile('__init__.py', repls)
def generateWorkflows(self):
'''Generates the file that contains one function by workflow.
Those functions are called by Plone for registering the workflows.'''
workflows = ''
for wfDescr in self.workflows:
# Compute state names & info, transition names & infos, managed
# permissions
stateNames=','.join(['"%s"' % sn for sn in wfDescr.getStateNames()])
stateInfos = wfDescr.getStatesInfo(asDumpableCode=True)
transitionNames = ','.join(['"%s"' % tn for tn in \
wfDescr.getTransitionNames()])
transitionInfos = wfDescr.getTransitionsInfo(asDumpableCode=True)
managedPermissions = ','.join(['"%s"' % tn for tn in \
wfDescr.getManagedPermissions()])
wfName = WorkflowDescriptor.getWorkflowName(wfDescr.klass)
workflows += '%s\ndef create_%s(self, id):\n ' \
'stateNames = [%s]\n ' \
'stateInfos = %s\n ' \
'transitionNames = [%s]\n ' \
'transitionInfos = %s\n ' \
'managedPermissions = [%s]\n ' \
'return WorkflowCreator("%s", DCWorkflowDefinition, ' \
'stateNames, "%s", stateInfos, transitionNames, ' \
'transitionInfos, managedPermissions, PROJECTNAME, ' \
'ExternalMethod).run()\n' \
'addWorkflowFactory(create_%s,\n id="%s",\n ' \
'title="%s")\n\n' % (wfDescr.getScripts(), wfName, stateNames,
stateInfos, transitionNames, transitionInfos,
managedPermissions, wfName, wfDescr.getInitialStateName(),
wfName, wfName, wfName)
repls = self.repls.copy()
repls['workflows'] = workflows
self.copyFile('workflows.py', repls, destFolder='Extensions')
def generateWrapperProperty(self, name, type):
'''Generates the getter for attribute p_name.'''
res = ' def get_%s(self):\n ' % name
@ -519,18 +456,17 @@ class Generator(AbstractGenerator):
* "custom" it includes descriptors for the config-related classes
for which the user has created a sub-class.'''
if not include: return self.classes
else:
res = self.classes[:]
configClasses = [self.tool, self.user, self.translation]
if include == 'all':
res += configClasses
elif include == 'allButTool':
res += configClasses[1:]
elif include == 'custom':
res += [c for c in configClasses if c.customized]
elif include == 'predefined':
res = configClasses
return res
res = self.classes[:]
configClasses = [self.tool, self.user, self.translation]
if include == 'all':
res += configClasses
elif include == 'allButTool':
res += configClasses[1:]
elif include == 'custom':
res += [c for c in configClasses if c.customized]
elif include == 'predefined':
res = configClasses
return res
def getClassesInOrder(self, allClasses):
'''When generating wrappers, classes mut be dumped in order (else, it
@ -793,44 +729,39 @@ class Generator(AbstractGenerator):
self.copyFile('Class.py', repls, destName=fileName)
def generateWorkflow(self, wfDescr):
'''This method does not generate the workflow definition, which is done
in self.generateWorkflows. This method just creates the i18n labels
related to the workflow described by p_wfDescr.'''
'''This method creates the i18n labels related to the workflow described
by p_wfDescr.'''
k = wfDescr.klass
print 'Generating %s.%s (gen-workflow)...' % (k.__module__, k.__name__)
# Identify Plone workflow name
# Identify workflow name
wfName = WorkflowDescriptor.getWorkflowName(wfDescr.klass)
# Add i18n messages for states and transitions
for sName in wfDescr.getStateNames():
poMsg = PoMessage('%s_%s' % (wfName, sName), '', sName)
# Add i18n messages for states
for name in dir(wfDescr.klass):
if not isinstance(getattr(wfDescr.klass, name), State): continue
poMsg = PoMessage('%s_%s' % (wfName, name), '', name)
poMsg.produceNiceDefault()
self.labels.append(poMsg)
for tName, tLabel in wfDescr.getTransitionNames(withLabels=True):
poMsg = PoMessage('%s_%s' % (wfName, tName), '', tLabel)
# Add i18n messages for transitions
for name in dir(wfDescr.klass):
transition = getattr(wfDescr.klass, name)
if not isinstance(transition, Transition): continue
poMsg = PoMessage('%s_%s' % (wfName, name), '', name)
poMsg.produceNiceDefault()
self.labels.append(poMsg)
for transition in wfDescr.getTransitions():
# Get the Appy transition name
tName = wfDescr.getNameOf(transition)
# Get the names of the corresponding DC transition(s)
tNames = wfDescr.getTransitionNamesOf(tName, transition)
if transition.confirm:
# We need to generate a label for the message that will be shown
# in the confirm popup.
for tn in tNames:
label = '%s_%s_confirm' % (wfName, tn)
poMsg = PoMessage(label, '', PoMessage.CONFIRM)
self.labels.append(poMsg)
label = '%s_%s_confirm' % (wfName, name)
poMsg = PoMessage(label, '', PoMessage.CONFIRM)
self.labels.append(poMsg)
if transition.notify:
# Appy will send a mail when this transition is triggered.
# So we need 2 i18n labels for every DC transition corresponding
# to this Appy transition: one for the mail subject and one for
# So we need 2 i18n labels: one for the mail subject and one for
# the mail body.
for tn in tNames:
subjectLabel = '%s_%s_mail_subject' % (wfName, tn)
poMsg = PoMessage(subjectLabel, '', PoMessage.EMAIL_SUBJECT)
self.labels.append(poMsg)
bodyLabel = '%s_%s_mail_body' % (wfName, tn)
poMsg = PoMessage(bodyLabel, '', PoMessage.EMAIL_BODY)
self.labels.append(poMsg)
subjectLabel = '%s_%s_mail_subject' % (wfName, name)
poMsg = PoMessage(subjectLabel, '', PoMessage.EMAIL_SUBJECT)
self.labels.append(poMsg)
bodyLabel = '%s_%s_mail_body' % (wfName, name)
poMsg = PoMessage(bodyLabel, '', PoMessage.EMAIL_BODY)
self.labels.append(poMsg)
# ------------------------------------------------------------------------------

View file

@ -34,7 +34,6 @@ class PloneInstaller:
self.catalogMap = cfg.catalogMap
self.applicationRoles = cfg.applicationRoles # Roles defined in the app
self.defaultAddRoles = cfg.defaultAddRoles
self.workflows = cfg.workflows
self.appFrontPage = cfg.appFrontPage
self.showPortlet = cfg.showPortlet
self.languages = cfg.languages
@ -378,22 +377,6 @@ class PloneInstaller:
site.portal_groups.setRolesForGroup(group, [role])
site.__ac_roles__ = tuple(data)
def installWorkflows(self):
'''Creates or updates the workflows defined in the application.'''
wfTool = self.ploneSite.portal_workflow
for contentType, workflowName in self.workflows.iteritems():
# Register the workflow if needed
if workflowName not in wfTool.listWorkflows():
wfMethod = self.config.ExternalMethod('temp', 'temp',
self.productName + '.workflows', 'create_%s' % workflowName)
workflow = wfMethod(self, workflowName)
wfTool._setObject(workflowName, workflow)
else:
self.appyTool.log('%s already in workflows.' % workflowName)
# Link the workflow to the current content type
wfTool.setChainForPortalTypes([contentType], workflowName)
return wfTool
def installStyleSheet(self):
'''Registers In Plone the stylesheet linked to this application.'''
cssName = self.productName + '.css'
@ -495,7 +478,6 @@ class PloneInstaller:
self.installTool()
self.installTranslations()
self.installRolesAndGroups()
self.installWorkflows()
self.installStyleSheet()
self.managePortlets()
self.manageIndexes()

View file

@ -5,9 +5,11 @@
# ------------------------------------------------------------------------------
import os, os.path, sys, types, mimetypes, urllib, cgi
import appy.gen
from appy.gen import Type, String, Selection, Role, No
from appy.gen import Type, String, Selection, Role, No, WorkflowAnonymous, \
Transition
from appy.gen.utils import *
from appy.gen.layout import Table, defaultPageLayouts
from appy.gen.descriptors import WorkflowDescriptor
from appy.gen.plone25.descriptors import ClassDescriptor
from appy.gen.plone25.utils import updateRolesForPermission,checkTransitionGuard
@ -300,6 +302,27 @@ class BaseMixin:
else: logMethod = logger.info
logMethod(msg)
def getState(self, name=True):
'''Returns state information about this object. If p_name is True, the
returned info is the state name. Else, it is the State instance.'''
if hasattr(self.aq_base, 'workflow_history'):
key = self.workflow_history.keys()[0]
stateName = self.workflow_history[key][-1]['review_state']
if name: return stateName
else: return getattr(self.getWorkflow(), stateName)
else:
# No workflow information is available (yet) on this object. So
# return the workflow initial state.
wf = self.getWorkflow()
initStateName = 'active'
for elem in dir(wf):
attr = getattr(wf, elem)
if (attr.__class__.__name__ == 'State') and attr.initial:
initStateName = elem
break
if name: return initStateName
else: return getattr(wf, initStateName)
def rememberPreviousData(self):
'''This method is called before updating an object and remembers, for
every historized field, the previous value. Result is a dict
@ -327,11 +350,10 @@ class BaseMixin:
else:
changes[fieldName] = (changes[fieldName], appyType.labelId)
# Create the event to record in the history
DateTime = self.getProductConfig().DateTime
state = self.portal_workflow.getInfoFor(self, 'review_state')
from DateTime import DateTime
user = self.portal_membership.getAuthenticatedMember()
event = {'action': '_datachange_', 'changes': changes,
'review_state': state, 'actor': user.id,
'review_state': self.getState(), 'actor': user.id,
'time': DateTime(), 'comments': ''}
# Add the event to the history
histKey = self.workflow_history.keys()[0]
@ -583,62 +605,48 @@ class BaseMixin:
'''Returns information about the states that are related to p_phase.
If p_currentOnly is True, we return the current state, even if not
related to p_phase.'''
res = []
dcWorkflow = self.getWorkflow(appy=False)
if not dcWorkflow: return res
currentState = self.portal_workflow.getInfoFor(self, 'review_state')
currentState = self.getState()
if currentOnly:
return [StateDescr(currentState,'current').get()]
workflow = self.getWorkflow(appy=True)
if workflow:
stateStatus = 'done'
for stateName in workflow._states:
if stateName == currentState:
stateStatus = 'current'
elif stateStatus != 'done':
stateStatus = 'future'
state = getattr(workflow, stateName)
if (state.phase == phase) and \
(self._appy_showState(workflow, state.show)):
res.append(StateDescr(stateName, stateStatus).get())
return [StateDescr(currentState, 'current').get()]
res = []
workflow = self.getWorkflow()
stateStatus = 'done'
for stateName in dir(workflow):
if getattr(workflow, stateName).__class__.__name__ != 'State':
continue
if stateName == currentState:
stateStatus = 'current'
elif stateStatus != 'done':
stateStatus = 'future'
state = getattr(workflow, stateName)
if (state.phase == phase) and \
(self._appy_showState(workflow, state.show)):
res.append(StateDescr(stateName, stateStatus).get())
return res
def getAppyTransitions(self, includeFake=True, includeNotShowable=False):
'''This method is similar to portal_workflow.getTransitionsFor, but:
* is able (or not, depending on boolean p_includeFake) to retrieve
transitions that the user can't trigger, but for which he needs to
know for what reason he can't trigger it;
* is able (or not, depending on p_includeNotShowable) to include
transitions for which show=False at the Appy level. Indeed, because
"showability" is only a GUI concern, and not a security concern,
in some cases it has sense to set includeNotShowable=True, because
those transitions are triggerable from a security point of view;
* the transition-info is richer: it contains fake-related info (as
described above) and confirm-related info (ie, when clicking on
the button, do we ask the user to confirm via a popup?)'''
'''This method returns info about transitions that one can trigger from
the user interface.
* if p_includeFake is True, it retrieves transitions that the user
can't trigger, but for which he needs to know for what reason he
can't trigger it;
* if p_includeNotShowable is True, it includes transitions for which
show=False. Indeed, because "showability" is only a GUI concern,
and not a security concern, in some cases it has sense to set
includeNotShowable=True, because those transitions are triggerable
from a security point of view.
'''
res = []
# Get some Plone stuff from the Plone-level config.py
TRIGGER_USER_ACTION = self.getProductConfig().TRIGGER_USER_ACTION
sm = self.getProductConfig().getSecurityManager
# Get the workflow definition for p_obj.
workflow = self.getWorkflow(appy=False)
if not workflow: return res
appyWorkflow = self.getWorkflow(appy=True)
# What is the current state for this object?
currentState = workflow._getWorkflowStateOf(self)
if not currentState: return res
# Analyse all the transitions that start from this state.
for transitionId in currentState.transitions:
transition = workflow.transitions.get(transitionId, None)
appyTr = appyWorkflow._transitionsMapping[transitionId]
if not transition or (transition.trigger_type!=TRIGGER_USER_ACTION)\
or not transition.actbox_name: continue
# We have a possible candidate for a user-triggerable transition
if transition.guard is None:
mayTrigger = True
else:
mayTrigger = checkTransitionGuard(transition.guard, sm(),
workflow, self)
wf = self.getWorkflow()
currentState = self.getState(name=False)
# Loop on every transition
for name in dir(wf):
transition = getattr(wf, name)
if (transition.__class__.__name__ != 'Transition'): continue
# Filter transitions that do not have currentState as start state
if not transition.hasState(currentState, True): continue
# Check if the transition can be triggered
mayTrigger = transition.isTriggerable(self, wf)
# Compute the condition that will lead to including or not this
# transition
if not includeFake:
@ -646,19 +654,15 @@ class BaseMixin:
else:
includeIt = mayTrigger or isinstance(mayTrigger, No)
if not includeNotShowable:
includeIt = includeIt and appyTr.isShowable(appyWorkflow, self)
includeIt = includeIt and transition.isShowable(wf, self)
if not includeIt: continue
# Add transition-info to the result.
tInfo = {'id': transition.id, 'title': transition.title,
'title_or_id': transition.title_or_id(),
'description': transition.description, 'confirm': '',
'name': transition.actbox_name, 'may_trigger': True,
'url': transition.actbox_url %
{'content_url': self.absolute_url(),
'portal_url' : '', 'folder_url' : ''}}
if appyTr.confirm:
label = '%s_confirm' % tInfo['name']
tInfo['confirm'] = self.translate(label, format='js')
label = self.getWorkflowLabel(name)
tInfo = {'name': name, 'title': self.translate(label),
'confirm': '', 'may_trigger': True}
if transition.confirm:
cLabel = '%s_confirm' % label
tInfo['confirm'] = self.translate(cLabel, format='js')
if not mayTrigger:
tInfo['may_trigger'] = False
tInfo['reason'] = mayTrigger.msg
@ -793,39 +797,32 @@ class BaseMixin:
reverse = rq.get('reverse') == 'True'
self.appy().sort(fieldName, sortKey=sortKey, reverse=reverse)
def getWorkflow(self, appy=True):
'''Returns the Appy workflow instance that is relevant for this
object. If p_appy is False, it returns the DC workflow.'''
res = None
if appy:
# Get the workflow class first
workflowClass = None
if self.wrapperClass:
appyClass = self.wrapperClass.__bases__[-1]
if hasattr(appyClass, 'workflow'):
workflowClass = appyClass.workflow
if workflowClass:
# Get the corresponding prototypical workflow instance
res = self.getProductConfig().workflowInstances[workflowClass]
else:
dcWorkflows = self.portal_workflow.getWorkflowsFor(self)
if dcWorkflows:
res = dcWorkflows[0]
return res
def notifyWorkflowCreated(self):
'''This method is called by Zope/CMF every time an object is created,
be it temp or not. The objective here is to initialise workflow-
related data on the object.'''
wf = self.getWorkflow()
# Get the initial workflow state
initialState = self.getState(name=False)
# Create a Transition instance representing the initial transition.
initialTransition = Transition((initialState, initialState))
initialTransition.trigger('_init_', self, wf, '')
def getWorkflow(self, name=False):
'''Returns the workflow applicable for p_self (or its name, if p_name
is True).'''
appyClass = self.wrapperClass.__bases__[-1]
if hasattr(appyClass, 'workflow'): wf = appyClass.workflow
else: wf = WorkflowAnonymous
if not name: return wf
return WorkflowDescriptor.getWorkflowName(wf)
def getWorkflowLabel(self, stateName=None):
'''Gets the i18n label for the workflow current state. If no p_stateName
is given, workflow label is given for the current state.'''
res = ''
wf = self.getWorkflow(appy=False)
if wf:
res = stateName
if not res:
res = self.portal_workflow.getInfoFor(self, 'review_state')
appyWf = self.getWorkflow(appy=True)
if appyWf:
res = '%s_%s' % (wf.id, res)
return res
'''Gets the i18n label for p_stateName, or for the current object state
if p_stateName is not given. Note that if p_stateName is given, it
can also represent the name of a transition.'''
stateName = stateName or self.getState()
return '%s_%s' % (self.getWorkflow(name=True), stateName)
def hasHistory(self):
'''Has this object an history?'''
@ -848,39 +845,6 @@ class BaseMixin:
return {'events': history[startNumber:startNumber+batchSize],
'totalNumber': len(history)}
def may(self, transitionName):
'''May the user execute transition named p_transitionName?'''
# Get the Appy workflow instance
workflow = self.getWorkflow()
res = False
if workflow:
# Get the corresponding Appy transition
transition = workflow._transitionsMapping[transitionName]
user = self.portal_membership.getAuthenticatedMember()
if isinstance(transition.condition, Role):
# It is a role. Transition may be triggered if the user has this
# role.
res = user.has_role(transition.condition.name, self)
elif type(transition.condition) == types.FunctionType:
res = transition.condition(workflow, self.appy())
elif type(transition.condition) in (tuple, list):
# It is a list of roles and or functions. Transition may be
# triggered if user has at least one of those roles and if all
# functions return True.
hasRole = None
for roleOrFunction in transition.condition:
if isinstance(roleOrFunction, basestring):
if hasRole == None:
hasRole = False
if user.has_role(roleOrFunction, self):
hasRole = True
elif type(roleOrFunction) == types.FunctionType:
if not roleOrFunction(workflow, self.appy()):
return False
if hasRole != False:
res = True
return res
def mayNavigate(self):
'''May the currently logged user see the navigation panel linked to
this object?'''
@ -946,16 +910,28 @@ class BaseMixin:
# the user.
return self.goto(msg)
def onTriggerTransition(self):
def do(self, transitionName, comment='', doAction=True, doNotify=True,
doHistory=True, doSay=True):
'''Triggers transition named p_transitionName.'''
# Check that this transition exists.
wf = self.getWorkflow()
if not hasattr(wf, transitionName) or \
getattr(wf, transitionName).__class__.__name__ != 'Transition':
raise 'Transition "%s" was not found.' % transitionName
# Is this transition triggerable?
transition = getattr(wf, transitionName)
if not transition.isTriggerable(self, wf):
raise 'Transition "%s" can\'t be triggered' % transitionName
# Trigger the transition
transition.trigger(transitionName, self, wf, comment, doAction=doAction,
doNotify=doNotify, doHistory=doHistory, doSay=doSay)
def onDo(self):
'''This method is called whenever a user wants to trigger a workflow
transition on an object.'''
rq = self.REQUEST
self.portal_workflow.doActionFor(self, rq['workflow_action'],
comment = rq.get('comment', ''))
self.do(rq['workflow_action'], comment=rq.get('comment', ''))
self.reindexObject()
# Where to redirect the user back ?
# TODO (?): remove the "phase" param for redirecting the user to the
# next phase when relevant.
return self.goto(self.getUrl(rq['HTTP_REFERER']))
def fieldValueSelected(self, fieldName, vocabValue, dbValue):

View file

@ -34,10 +34,10 @@ SENDMAIL_ERROR = 'Error while sending mail: %s.'
ENCODING_ERROR = 'Encoding error while sending mail: %s.'
from appy.gen.utils import sequenceTypes
from appy.gen.plone25.descriptors import WorkflowDescriptor
from appy.gen.descriptors import WorkflowDescriptor
import socket
def sendMail(obj, transition, transitionName, workflow, logger):
def sendMail(obj, transition, transitionName, workflow):
'''Sends mail about p_transition that has been triggered on p_obj that is
controlled by p_workflow.'''
wfName = WorkflowDescriptor.getWorkflowName(workflow.__class__)
@ -94,12 +94,12 @@ def sendMail(obj, transition, transitionName, workflow, logger):
recipient.encode(enc), fromAddress.encode(enc),
mailSubject.encode(enc), mcc=cc, charset='utf-8')
except socket.error, sg:
logger.warn(SENDMAIL_ERROR % str(sg))
obj.log(SENDMAIL_ERROR % str(sg), type='warning')
break
except UnicodeDecodeError, ue:
logger.warn(ENCODING_ERROR % str(ue))
obj.log(ENCODING_ERROR % str(ue), type='warning')
break
except Exception, e:
logger.warn(SENDMAIL_ERROR % str(e))
obj.log(SENDMAIL_ERROR % str(e), type='warning')
break
# ------------------------------------------------------------------------------

View file

@ -515,7 +515,7 @@
tal:condition="transitions">
<form id="triggerTransitionForm" method="post"
tal:attributes="action python: contextObj.absolute_url() + '/skyn/do'">
<input type="hidden" name="action" value="TriggerTransition"/>
<input type="hidden" name="action" value="Do"/>
<input type="hidden" name="workflow_action"/>
<table>
<tr valign="middle">
@ -530,11 +530,11 @@
<td align="right" tal:repeat="transition transitions">
<tal:comment replace="nothing">Real button</tal:comment>
<input type="button" class="appyButton" tal:condition="transition/may_trigger"
tal:attributes="value python: tool.translate(transition['name']);
onClick python: 'triggerTransition(\'%s\',\'%s\')' % (transition['id'],transition['confirm']);"/>
tal:attributes="value transition/title;
onClick python: 'triggerTransition(\'%s\',\'%s\')' % (transition['name'],transition['confirm']);"/>
<tal:comment replace="nothing">Fake button, explaining why the transition can't be triggered</tal:comment>
<div class="appyButton fakeButton" tal:condition="not: transition/may_trigger">
<acronym tal:content="python: tool.translate(transition['name'])"
<acronym tal:content="transition/title"
tal:attributes="title transition/reason"></acronym>
</div>
</td>

View file

@ -27,7 +27,6 @@ from Products.CMFPlone.utils import ToolInit
from Products.CMFPlone.interfaces import IPloneSiteRoot
from Products.CMFCore import DirectoryView
from Products.CMFCore.DirectoryView import manage_addDirectoryView
from Products.DCWorkflow.Transitions import TRIGGER_USER_ACTION
from Products.ExternalMethod.ExternalMethod import ExternalMethod
from Products.Archetypes.Extensions.utils import installTypes
from Products.Archetypes.Extensions.utils import install_subskin
@ -57,17 +56,6 @@ allClassNames = [<!allClassNames!>]
catalogMap = {}
<!catalogMap!>
# Dict whose keys are class names and whose values are workflow names (=the
# workflow used by the content type)
workflows = {<!workflows!>}
# In the following dict, we keep one instance for every Appy workflow defined
# in the application. Those prototypical instances will be used for executing
# user-defined actions and transitions. For each instance, we add a special
# attribute "_transitionsMapping" that allows to get Appy transitions from the
# names of DC transitions.
workflowInstances = {}
<!workflowInstancesInit!>
# In the following dict, we store, for every Appy class, the ordered list of
# appy types (included inherited ones).
attributes = {<!attributes!>}

View file

@ -1,11 +0,0 @@
<!codeHeader!>
from Products.CMFCore.WorkflowTool import addWorkflowFactory
from Products.DCWorkflow.DCWorkflow import DCWorkflowDefinition
from appy.gen.plone25.workflow import WorkflowCreator
from Products.<!applicationName!>.config import PROJECTNAME
from Products.ExternalMethod.ExternalMethod import ExternalMethod
import logging
logger = logging.getLogger('<!applicationName!>')
from appy.gen.plone25.workflow import do
<!workflows!>

View file

@ -1,186 +0,0 @@
'''This package contains functions for managing workflow events.'''
# ------------------------------------------------------------------------------
class WorkflowCreator:
'''This class allows to construct the Plone workflow that corresponds to a
Appy workflow.'''
def __init__(self, wfName, ploneWorkflowClass, stateNames, initialState,
stateInfos, transitionNames, transitionInfos, managedPermissions,
productName, externalMethodClass):
self.wfName = wfName
self.ploneWorkflowClass = ploneWorkflowClass
self.stateNames = stateNames
self.initialState = initialState # Name of the initial state
self.stateInfos = stateInfos
# stateInfos is a dict giving information about every state. Keys are
# state names, values are lists? Every list contains (in this order):
# - the list of transitions (names) going out from this state;
# - a dict of permissions, whose keys are permission names and whose
# values are lists of roles that are granted this permission. In
# short: ~{s_stateName: ([transitions], {s_permissionName:
# (roleNames)})}~.
self.transitionNames = transitionNames
self.transitionInfos = transitionInfos
# transitionInfos is a dict giving information avout every transition.
# Keys are transition names, values are end states of the transitions.
self.variableInfos = {
'review_history': ("Provides access to workflow history",
'state_change/getHistory', 0, 0, {'guard_permissions':\
'Request review; Review portal content'}),
'comments': ("Comments about the last transition",
'python:state_change.kwargs.get("comment", "")', 1, 1, None),
'time': ("Time of the last transition", "state_change/getDateTime",
1, 1, None),
'actor': ("The ID of the user who performed the last transition",
"user/getId", 1, 1, None),
'action': ("The last transition", "transition/getId|nothing",
1, 1, None)
}
self.managedPermissions = managedPermissions
self.ploneWf = None # The Plone DC workflow definition
self.productName = productName
self.externalMethodClass = externalMethodClass
def createWorkflowDefinition(self):
'''Creates the Plone instance corresponding to this workflow.'''
self.ploneWf = self.ploneWorkflowClass(self.wfName)
self.ploneWf.setProperties(title=self.wfName)
def createWorkflowElements(self):
'''Creates states, transitions, variables and managed permissions and
sets the initial state.'''
wf = self.ploneWf
# Create states
for s in self.stateNames:
try:
wf.states[s]
except KeyError, k:
# It does not exist, so we create it!
wf.states.addState(s)
# Create transitions
for t in self.transitionNames:
try:
wf.transitions[t]
except KeyError, k:
wf.transitions.addTransition(t)
# Create variables
for v in self.variableInfos.iterkeys():
try:
wf.variables[v]
except KeyError, k:
wf.variables.addVariable(v)
# Create managed permissions
for mp in self.managedPermissions:
try:
wf.addManagedPermission(mp)
except ValueError, va:
pass # Already a managed permission
# Set initial state
if not wf.initial_state: wf.states.setInitialState(self.initialState)
def getTransitionScriptName(self, transitionName):
'''Gets the name of the script corresponding to DC p_transitionName.'''
return '%s_do%s%s' % (self.wfName, transitionName[0].upper(),
transitionName[1:])
def configureStatesAndTransitions(self):
'''Configures states and transitions of the Plone workflow.'''
wf = self.ploneWf
# Configure states
for stateName, stateInfo in self.stateInfos.iteritems():
state = wf.states[stateName]
stateTitle = '%s_%s' % (self.wfName, stateName)
state.setProperties(title=stateTitle, description="",
transitions=stateInfo[0])
for permissionName, roles in stateInfo[1].iteritems():
state.setPermission(permissionName, 0, roles)
# Configure transitions
for transitionName, endStateName in self.transitionInfos.iteritems():
# Define the script to call when the transition has been triggered.
scriptName = self.getTransitionScriptName(transitionName)
if not scriptName in wf.scripts.objectIds():
sn = scriptName
wf.scripts._setObject(sn, self.externalMethodClass(
sn, sn, self.productName + '.workflows', sn))
# Configure the transition in itself
transition = wf.transitions[transitionName]
transition.setProperties(
title=transitionName, new_state_id=endStateName, trigger_type=1,
script_name="", after_script_name=scriptName,
actbox_name='%s_%s' % (self.wfName, transitionName),
actbox_url="",
props={'guard_expr': 'python:here.may("%s")' % transitionName})
def configureVariables(self):
'''Configures the variables defined in this workflow.'''
wf = self.ploneWf
# Set the name of the state variable
wf.variables.setStateVar('review_state')
# Configure the variables
for variableName, info in self.variableInfos.iteritems():
var = wf.variables[variableName]
var.setProperties(description=info[0], default_value='',
default_expr=info[1], for_catalog=0, for_status=info[2],
update_always=info[3], props=info[4])
def run(self):
self.createWorkflowDefinition()
self.createWorkflowElements()
self.configureStatesAndTransitions()
self.configureVariables()
return self.ploneWf
# ------------------------------------------------------------------------------
import notifier
def do(transitionName, stateChange, logger):
'''This function is called by a Plone workflow every time a transition named
p_transitionName has been triggered. p_stateChange.objet is the Plone
object on which the transition has been triggered; p_logger is the Zope
logger allowing to dump information, warnings or errors in the log file
or object.'''
ploneObj = stateChange.object
workflow = ploneObj.getWorkflow()
transition = workflow._transitionsMapping[transitionName]
msg = ''
# Must I execute transition-related actions and notifications?
doAction = False
if transition.action:
doAction = True
if hasattr(ploneObj, '_appy_do') and \
not ploneObj._appy_do['doAction']:
doAction = False
doNotify = False
if transition.notify:
doNotify = True
if hasattr(ploneObj, '_appy_do') and \
not ploneObj._appy_do['doNotify']:
doNotify = False
elif not getattr(ploneObj.getTool().appy(), 'enableNotifications'):
# We do not notify if the "notify" flag in the tool is disabled.
doNotify = False
if doAction or doNotify:
obj = ploneObj.appy()
if doAction:
msg = ''
if type(transition.action) in (tuple, list):
# We need to execute a list of actions
for act in transition.action:
msgPart = act(workflow, obj)
if msgPart: msg += msgPart
else: # We execute a single action only.
msgPart = transition.action(workflow, obj)
if msgPart: msg += msgPart
if doNotify:
notifier.sendMail(obj, transition, transitionName, workflow, logger)
# Produce a message to the user
if hasattr(ploneObj, '_appy_do') and not ploneObj._appy_do['doSay']:
# We do not produce any message if the transition was triggered
# programmatically.
return
# Produce a default message if no transition has given a custom one.
if not msg:
msg = ploneObj.translate(u'Your content\'s status has been modified.',
domain='plone')
ploneObj.say(msg)
# ------------------------------------------------------------------------------

View file

@ -37,7 +37,7 @@ class AbstractWrapper:
def __cmp__(self, other):
if other: return cmp(self.o, other.o)
else: return 1
return 1
def _callCustom(self, methodName, *args, **kwargs):
'''This wrapper implements some methods like "validate" and "onEdit".
@ -68,8 +68,13 @@ class AbstractWrapper:
def get_uid(self): return self.o.UID()
uid = property(get_uid)
def get_state(self):
return self.o.portal_workflow.getInfoFor(self.o, 'review_state')
def get_klass(self): return self.__class__.__bases__[-1]
klass = property(get_klass)
def get_url(self): return self.o.absolute_url()
url = property(get_url)
def get_state(self): return self.o.getState()
state = property(get_state)
def get_stateLabel(self):
@ -77,12 +82,6 @@ class AbstractWrapper:
return self.o.translate(self.o.getWorkflowLabel(), domain=appName)
stateLabel = property(get_stateLabel)
def get_klass(self): return self.__class__.__bases__[-1]
klass = property(get_klass)
def get_url(self): return self.o.absolute_url()
url = property(get_url)
def get_history(self):
key = self.o.workflow_history.keys()[0]
return self.o.workflow_history[key]
@ -256,42 +255,12 @@ class AbstractWrapper:
return self.o.translate(label, mapping, domain, language=language,
format=format)
def do(self, transition, comment='', doAction=False, doNotify=False,
def do(self, transition, comment='', doAction=True, doNotify=True,
doHistory=True):
'''This method allows to trigger on p_self a workflow p_transition
programmatically. If p_doAction is False, the action that must
normally be executed after the transition has been triggered will
not be executed. If p_doNotify is False, the notifications
(email,...) that must normally be launched after the transition has
been triggered will not be launched. If p_doHistory is False, there
will be no trace from this transition triggering in the workflow
history.'''
wfTool = self.o.portal_workflow
availableTransitions = [t['id'] for t in self.o.getAppyTransitions(\
includeFake=False, includeNotShowable=True)]
transitionName = transition
if not transitionName in availableTransitions:
# Maybe is it a compound Appy transition. Try to find the
# corresponding DC transition.
state = self.state
transitionPrefix = transition + state[0].upper() + state[1:] + 'To'
for at in availableTransitions:
if at.startswith(transitionPrefix):
transitionName = at
break
# Set in a versatile attribute details about what to execute or not
# (actions, notifications) after the transition has been executed by DC
# workflow.
self.o._appy_do = {'doAction': doAction, 'doNotify': doNotify,
'doSay': False}
if not doHistory:
comment = '_invisible_' # Will not be displayed.
# At first sight, I wanted to remove the entry from
# self.o.workflow_history. But Plone determines the state of an
# object by consulting the target state of the last transition in
# this workflow_history.
wfTool.doActionFor(self.o, transitionName, comment=comment)
del self.o._appy_do
programmatically. See doc in self.o.do.'''
return self.o.do(transition, comment, doAction=doAction,
doNotify=doNotify, doHistory=doHistory, doSay=False)
def log(self, message, type='info'): return self.o.log(message, type)
def say(self, message, type='info'): return self.o.say(message, type)