appy.gen: refactoring due to De-Plonization.

This commit is contained in:
Gaetan Delannay 2011-12-05 10:52:18 +01:00
parent 6733f4c7dc
commit d934f49a99
31 changed files with 1220 additions and 1325 deletions

View file

@ -0,0 +1,74 @@
# ------------------------------------------------------------------------------
from appy.gen.plone25.wrappers import AbstractWrapper
# ------------------------------------------------------------------------------
class GroupWrapper(AbstractWrapper):
def showLogin(self):
'''When must we show the login field?'''
if self.o.isTemporary(): return 'edit'
return 'view'
def validateLogin(self, login):
'''Is this p_login valid?'''
return True
def getGrantableRoles(self):
'''Returns the list of roles that the admin can grant to a user.'''
res = []
for role in self.o.getProductConfig().grantableRoles:
res.append( (role, self.translate('role_%s' % role)) )
return res
def validate(self, new, errors):
'''Inter-field validation.'''
return self._callCustom('validate', new, errors)
def confirm(self, new):
'''Use this method for remembering the previous list of users for this
group.'''
obj = self.o
if hasattr(obj.aq_base, '_oldUsers'): del obj.aq_base._oldUsers
obj._oldUsers = self.users
def addUser(self, user):
'''Adds a p_user to this group.'''
# Update the Ref field.
self.link('users', user)
# Update the group-related info on the Zope user.
zopeUser = user.getZopeUser()
zopeUser.groups[self.login] = self.roles
def removeUser(self, user):
'''Removes a p_user from this group.'''
self.unlink('users', user)
# Update the group-related info on the Zope user.
zopeUser = user.getZopeUser()
del zopeUser.groups[self.login]
def onEdit(self, created):
# Create or update, on every Zope user of this group, group-related
# information.
# 1. Remove reference to this group for users that were removed from it
newUsers = self.users
# The list of previously existing users does not exist when editing a
# group from Python. For updating self.users, it is recommended to use
# methods m_addUser and m_removeUser above.
oldUsers = getattr(self.o.aq_base, '_oldUsers', ())
for user in oldUsers:
if user not in newUsers:
del user.getZopeUser().groups[self.login]
self.log('User "%s" removed from group "%s".' % \
(user.login, self.login))
# 2. Add reference to this group for users that were added to it
for user in newUsers:
zopeUser = user.getZopeUser()
# We refresh group-related info on the Zope user even if the user
# was already in the group.
zopeUser.groups[self.login] = self.roles
if user not in oldUsers:
self.log('User "%s" added to group "%s".' % \
(user.login, self.login))
if hasattr(self.o.aq_base, '_oldUsers'): del self.o._oldUsers
return self._callCustom('onEdit', created)
# ------------------------------------------------------------------------------

137
gen/wrappers/ToolWrapper.py Normal file
View file

@ -0,0 +1,137 @@
# ------------------------------------------------------------------------------
import os.path
import appy
from appy.shared.utils import executeCommand
from appy.gen.plone25.wrappers import AbstractWrapper
# ------------------------------------------------------------------------------
_PY = 'Please specify a file corresponding to a Python interpreter ' \
'(ie "/usr/bin/python").'
FILE_NOT_FOUND = 'Path "%s" was not found.'
VALUE_NOT_FILE = 'Path "%s" is not a file. ' + _PY
NO_PYTHON = "Name '%s' does not starts with 'python'. " + _PY
NOT_UNO_ENABLED_PYTHON = '"%s" is not a UNO-enabled Python interpreter. ' \
'To check if a Python interpreter is UNO-enabled, ' \
'launch it and type "import uno". If you have no ' \
'ImportError exception it is ok.'
# ------------------------------------------------------------------------------
class ToolWrapper(AbstractWrapper):
def validPythonWithUno(self, value):
'''This method represents the validator for field unoEnabledPython.'''
if value:
if not os.path.exists(value):
return FILE_NOT_FOUND % value
if not os.path.isfile(value):
return VALUE_NOT_FILE % value
if not os.path.basename(value).startswith('python'):
return NO_PYTHON % value
if os.system('%s -c "import uno"' % value):
return NOT_UNO_ENABLED_PYTHON % value
return True
podOutputFormats = ('odt', 'pdf', 'doc', 'rtf')
def getPodOutputFormats(self):
'''Gets the available output formats for POD documents.'''
return [(of, self.translate(of)) for of in self.podOutputFormats]
def getInitiator(self):
'''Retrieves the object that triggered the creation of the object
being currently created (if any).'''
nav = self.o.REQUEST.get('nav', '')
if nav: return self.getObject(nav.split('.')[1])
def getObject(self, uid):
'''Allow to retrieve an object from its unique identifier p_uid.'''
return self.o.getObject(uid, appy=True)
def getDiskFolder(self):
'''Returns the disk folder where the Appy application is stored.'''
return self.o.getProductConfig().diskFolder
def getAttributeName(self, attributeType, klass, attrName=None):
'''Some names of Tool attributes are not easy to guess. For example,
the attribute that stores the names of the columns to display in
query results for class A that is in package x.y is
"tool.resultColumnsForx_y_A". Other example: the attribute that
stores the editable default value of field "f1" of class x.y.A is
"tool.defaultValueForx_y_A_f1". This method generates the attribute
name based on p_attributeType, a p_klass from the application, and a
p_attrName (given only if needed, for example if p_attributeType is
"defaultValue"). p_attributeType may be:
"defaultValue"
Stores the editable default value for a given p_attrName of a
given p_klass.
"podTemplate"
Stores the pod template for p_attrName.
"formats"
Stores the output format(s) of a given pod template for
p_attrName.
"resultColumns"
Stores the list of columns that must be shown when displaying
instances of the a given root p_klass.
"enableAdvancedSearch"
Determines if the advanced search screen must be enabled for
p_klass.
"numberOfSearchColumns"
Determines in how many columns the search screen for p_klass
is rendered.
"searchFields"
Determines, among all indexed fields for p_klass, which one will
really be used in the search screen.
"optionalFields"
Stores the list of optional attributes that are in use in the
tool for the given p_klass.
"showWorkflow"
Stores the boolean field indicating if we must show workflow-
related information for p_klass or not.
"showWorkflowCommentField"
Stores the boolean field indicating if we must show the field
allowing to enter a comment every time a transition is triggered.
"showAllStatesInPhase"
Stores the boolean field indicating if we must show all states
linked to the current phase or not. If this field is False, we
simply show the current state, be it linked to the current phase
or not.
'''
fullClassName = self.o.getPortalType(klass)
res = '%sFor%s' % (attributeType, fullClassName)
if attrName: res += '_%s' % attrName
return res
def getAvailableLanguages(self):
'''Returns the list of available languages for this application.'''
return [(t.id, t.title) for t in self.translations]
def convert(self, fileName, format):
'''Launches a UNO-enabled Python interpreter as defined in the self for
converting, using OpenOffice in server mode, a file named p_fileName
into an output p_format.'''
convScript = '%s/pod/converter.py' % os.path.dirname(appy.__file__)
cmd = '%s %s "%s" %s -p%d' % (self.unoEnabledPython, convScript,
fileName, format, self.openOfficePort)
self.log('Executing %s...' % cmd)
return executeCommand(cmd) # The result can contain an error message
def refreshSecurity(self):
'''Refreshes, on every object in the database, security-related,
workflow-managed information.'''
context = {'nb': 0}
for className in self.o.getProductConfig().allClassNames:
self.compute(className, context=context, noSecurity=True,
expression="ctx['nb'] += int(obj.o.refreshSecurity())")
msg = 'Security refresh: %d object(s) updated.' % context['nb']
self.log(msg)
self.say(msg)
# ------------------------------------------------------------------------------

View file

@ -0,0 +1,78 @@
# ------------------------------------------------------------------------------
import os.path
from appy.gen.plone25.wrappers import AbstractWrapper
from appy.gen.po import PoFile, PoMessage
from appy.shared.utils import getOsTempFolder
# ------------------------------------------------------------------------------
class TranslationWrapper(AbstractWrapper):
def label(self, field):
'''The label for a text to translate displays the text of the
corresponding message in the source translation.'''
tool = self.tool
sourceLanguage = self.o.getProductConfig().sourceLanguage
sourceTranslation = getattr(tool.o, sourceLanguage).appy()
# p_field is the Computed field. We need to get the name of the
# corresponding field holding the translation message.
fieldName = field.name[:-6]
# If we are showing the source translation, we do not repeat the message
# in the label.
if self.id == sourceLanguage:
sourceMsg = ''
else:
sourceMsg = getattr(sourceTranslation,fieldName)
# When editing the value, we don't want HTML code to be interpreted.
# This way, the translator sees the HTML tags and can reproduce them
# in the translation.
url = self.request['URL']
if url.endswith('/ui/edit') or url.endswith('/do'):
sourceMsg = sourceMsg.replace('<','&lt;').replace('>','&gt;')
sourceMsg = sourceMsg.replace('\n', '<br/>')
return '<div class="translationLabel"><acronym title="%s">' \
'<img src="ui/help.png"/></acronym>%s</div>' % \
(fieldName, sourceMsg)
def show(self, field):
'''We show a field (or its label) only if the corresponding source
message is not empty.'''
tool = self.tool
if field.type == 'Computed': name = field.name[:-6]
else: name = field.name
# Get the source message
sourceLanguage = self.o.getProductConfig().sourceLanguage
sourceTranslation = getattr(tool.o, sourceLanguage).appy()
sourceMsg = getattr(sourceTranslation, name)
if field.isEmptyValue(sourceMsg): return False
return True
poReplacements = ( ('\r\n', '<br/>'), ('\n', '<br/>'), ('"', '\\"') )
def getPoFile(self):
'''Computes and returns the PO file corresponding to this
translation.'''
tool = self.tool
fileName = os.path.join(getOsTempFolder(),
'%s-%s.po' % (tool.o.getAppName(), self.id))
poFile = PoFile(fileName)
for field in self.fields:
if (field.name == 'title') or (field.type != 'String'): continue
# Adds the PO message corresponding to this field
msg = field.getValue(self.o) or ''
for old, new in self.poReplacements:
msg = msg.replace(old, new)
poFile.addMessage(PoMessage(field.name, msg, ''))
poFile.generate()
return True, file(fileName)
def validate(self, new, errors):
# Call a custom "validate" if any.
return self._callCustom('validate', new, errors)
def onEdit(self, created):
# Call a custom "onEdit" if any.
return self._callCustom('onEdit', created)
def onDelete(self):
# Call a custom "onDelete" if any.
self.log('Translation "%s" deleted by "%s".' % (self.id, self.user.id))
return self._callCustom('onDelete')
# ------------------------------------------------------------------------------

158
gen/wrappers/UserWrapper.py Normal file
View file

@ -0,0 +1,158 @@
# ------------------------------------------------------------------------------
from appy.gen.plone25.wrappers import AbstractWrapper
# ------------------------------------------------------------------------------
class UserWrapper(AbstractWrapper):
def showLogin(self):
'''When must we show the login field?'''
if self.o.isTemporary(): return 'edit'
return 'view'
def validateLogin(self, login):
'''Is this p_login valid?'''
# The login can't be the id of the whole site or "admin"
if login == 'admin':
return self.translate('This username is reserved.')
# Check that no user or group already uses this login.
if self.count('User', login=login) or self.count('Group', login=login):
return self.translate('This login is already in use.')
return True
def validatePassword(self, password):
'''Is this p_password valid?'''
# Password must be at least 5 chars length
if len(password) < 5:
return self.translate('Passwords must contain at least 5 letters.')
return True
def showPassword(self):
'''When must we show the 2 fields for entering a password ?'''
if self.o.isTemporary(): return 'edit'
return False
def getGrantableRoles(self):
'''Returns the list of roles that the admin can grant to a user.'''
res = []
for role in self.o.getProductConfig().grantableRoles:
res.append( (role, self.translate('role_%s' % role)) )
return res
def validate(self, new, errors):
'''Inter-field validation.'''
page = self.request.get('page', 'main')
if page == 'main':
if hasattr(new, 'password1') and (new.password1 != new.password2):
msg = self.translate('Passwords do not match.')
errors.password1 = msg
errors.password2 = msg
return self._callCustom('validate', new, errors)
def onEdit(self, created):
self.title = self.firstName + ' ' + self.name
aclUsers = self.o.acl_users
login = self.login
if created:
# Create the corresponding Zope user
aclUsers._doAddUser(login, self.password1, self.roles, ())
zopeUser = aclUsers.getUser(login)
# Remove our own password copies
self.password1 = self.password2 = ''
from persistent.mapping import PersistentMapping
# The following dict will store, for every group, global roles
# granted to it.
zopeUser.groups = PersistentMapping()
else:
# Updates roles at the Zope level.
zopeUser = aclUsers.getUserById(login)
zopeUser.roles = self.roles
# "self" must be owned by its Zope user
if 'Owner' not in self.o.get_local_roles_for_userid(login):
self.o.manage_addLocalRoles(login, ('Owner',))
return self._callCustom('onEdit', created)
def getZopeUser(self):
'''Gets the Zope user corresponding to this user.'''
return self.o.acl_users.getUser(self.login)
def onDelete(self):
'''Before deleting myself, I must delete the corresponding Zope user.'''
self.o.acl_users._doDelUsers([self.login])
self.log('User "%s" deleted.' % self.login)
# Call a custom "onDelete" if any.
return self._callCustom('onDelete')
# ------------------------------------------------------------------------------
try:
from AccessControl.PermissionRole import _what_not_even_god_should_do, \
rolesForPermissionOn
from Acquisition import aq_base
except ImportError:
pass # For those using Appy without Zope
class ZopeUserPatches:
'''This class is a fake one that defines Appy variants of some of Zope's
AccessControl.User methods. The idea is to implement the notion of group
of users.'''
def getRoles(self):
'''Returns the global roles that this user (or any of its groups)
possesses.'''
res = list(self.roles)
# Add group global roles
if not hasattr(aq_base(self), 'groups'): return res
for roles in self.groups.itervalues():
for role in roles:
if role not in res: res.append(role)
return res
def getRolesInContext(self, object):
'''Return the list of global and local (to p_object) roles granted to
this user (or to any of its groups).'''
object = getattr(object, 'aq_inner', object)
# Start with user global roles
res = self.getRoles()
# Add local roles
localRoles = getattr(object, '__ac_local_roles__', None)
if not localRoles: return res
userId = self.getId()
groups = getattr(self, 'groups', ())
for id, roles in localRoles.iteritems():
if (id != userId) and (id not in groups): continue
for role in roles: res.add(role)
return res
def allowed(self, object, object_roles=None):
'''Checks whether the user has access to p_object. The user (or one of
its groups) must have one of the roles in p_object_roles.'''
if object_roles is _what_not_even_god_should_do: return 0
# If "Anonymous" is among p_object_roles, grant access.
if (object_roles is None) or ('Anonymous' in object_roles): return 1
# If "Authenticated" is among p_object_roles, grant access if the user
# is not anonymous.
if 'Authenticated' in object_roles and \
(self.getUserName() != 'Anonymous User'):
if self._check_context(object): return 1
# Try first to grant access based on global user roles
for role in self.getRoles():
if role not in object_roles: continue
if self._check_context(object): return 1
return
# Try then to grant access based on local roles
innerObject = getattr(object, 'aq_inner', object)
localRoles = getattr(innerObject, '__ac_local_roles__', None)
if not localRoles: return
userId = self.getId()
groups = getattr(self, 'groups', ())
for id, roles in localRoles.iteritems():
if (id != userId) and (id not in groups): continue
for role in roles:
if role not in object_roles: continue
if self._check_context(object): return 1
return
from AccessControl.User import SimpleUser
SimpleUser.getRoles = getRoles
SimpleUser.getRolesInContext = getRolesInContext
SimpleUser.allowed = allowed
# ------------------------------------------------------------------------------

361
gen/wrappers/__init__.py Normal file
View file

@ -0,0 +1,361 @@
'''This package contains base classes for wrappers that hide to the Appy
developer the real classes used by the underlying web framework.'''
# ------------------------------------------------------------------------------
import os, os.path, mimetypes
import appy.pod
from appy.gen import Type, Search, Ref, String
from appy.gen.utils import sequenceTypes, createObject
from appy.shared.utils import getOsTempFolder, executeCommand, normalizeString
from appy.shared.xml_parser import XmlMarshaller
from appy.shared.csv_parser import CsvMarshaller
# Some error messages ----------------------------------------------------------
WRONG_FILE_TUPLE = 'This is not the way to set a file. You can specify a ' \
'2-tuple (fileName, fileContent) or a 3-tuple (fileName, fileContent, ' \
'mimeType).'
FREEZE_ERROR = 'Error while trying to freeze a "%s" file in POD field ' \
'"%s" (%s).'
FREEZE_FATAL_ERROR = 'A server error occurred. Please contact the system ' \
'administrator.'
# ------------------------------------------------------------------------------
class AbstractWrapper(object):
'''Any real Zope object has a companion object that is an instance of this
class.'''
def __init__(self, o): self.__dict__['o'] = o
def appy(self): return self
def __setattr__(self, name, value):
appyType = self.o.getAppyType(name)
if not appyType:
raise 'Attribute "%s" does not exist.' % name
appyType.store(self.o, value)
def __getattribute__(self, name):
'''Gets the attribute named p_name. Lot of cheating here.'''
if name == 'o': return object.__getattribute__(self, name)
elif name == 'tool': return self.o.getTool().appy()
elif name == 'request': return self.o.REQUEST
elif name == 'session': return self.o.REQUEST.SESSION
elif name == 'typeName': return self.__class__.__bases__[-1].__name__
elif name == 'id': return self.o.id
elif name == 'uid': return self.o.UID()
elif name == 'klass': return self.__class__.__bases__[-1]
elif name == 'url': return self.o.absolute_url()
elif name == 'state': return self.o.State()
elif name == 'stateLabel':
o = self.o
appName = o.getProductConfig().PROJECTNAME
return o.translate(o.getWorkflowLabel(), domain=appName)
elif name == 'history':
o = self.o
key = o.workflow_history.keys()[0]
return o.workflow_history[key]
elif name == 'user':
return self.o.getUser()
elif name == 'appyUser':
return self.search('User', login=self.o.getUser().getId())[0]
elif name == 'fields': return self.o.getAllAppyTypes()
# Now, let's try to return a real attribute.
res = object.__getattribute__(self, name)
# If we got an Appy type, return the value of this type for this object
if isinstance(res, Type):
o = self.o
if isinstance(res, Ref):
return res.getValue(o, noListIfSingleObj=True)
else:
return res.getValue(o)
return res
def __repr__(self):
return '<%s appyobj at %s>' % (self.klass.__name__, id(self))
def __cmp__(self, other):
if other: return cmp(self.o, other.o)
return 1
def _callCustom(self, methodName, *args, **kwargs):
'''This wrapper implements some methods like "validate" and "onEdit".
If the user has defined its own wrapper, its methods will not be
called. So this method allows, from the methods here, to call the
user versions.'''
if len(self.__class__.__bases__) > 1:
# There is a custom user class
customUser = self.__class__.__bases__[-1]
if customUser.__dict__.has_key(methodName):
return customUser.__dict__[methodName](self, *args, **kwargs)
def getField(self, name): return self.o.getAppyType(name)
def link(self, fieldName, obj):
'''This method links p_obj (which can be a list of objects) to this one
through reference field p_fieldName.'''
return self.getField(fieldName).linkObject(self.o, obj)
def unlink(self, fieldName, obj):
'''This method unlinks p_obj (which can be a list of objects) from this
one through reference field p_fieldName.'''
return self.getField(fieldName).unlinkObject(self.o, obj)
def sort(self, fieldName, sortKey='title', reverse=False):
'''Sorts referred elements linked to p_self via p_fieldName according
to a given p_sortKey which must be an attribute set on referred
objects ("title", by default).'''
refs = getattr(self.o, fieldName, None)
if not refs: return
tool = self.tool
refs.sort(lambda x,y: cmp(getattr(tool.getObject(x), sortKey),
getattr(tool.getObject(y), sortKey)))
if reverse: refs.reverse()
def create(self, fieldNameOrClass, **kwargs):
'''If p_fieldNameOrClass is the name of a field, this method allows to
create an object and link it to the current one (self) through
reference field named p_fieldName.
If p_fieldNameOrClass is a class from the gen-application, it must
correspond to a root class and this method allows to create a
root object in the application folder.'''
isField = isinstance(fieldNameOrClass, basestring)
tool = self.tool.o
# Determine the portal type of the object to create
if isField:
fieldName = fieldNameOrClass
appyType = self.o.getAppyType(fieldName)
portalType = tool.getPortalType(appyType.klass)
else:
klass = fieldNameOrClass
portalType = tool.getPortalType(klass)
# Determine object id
if kwargs.has_key('id'):
objId = kwargs['id']
del kwargs['id']
else:
objId = tool.generateUid(portalType)
# Determine if object must be created from external data
externalData = None
if kwargs.has_key('_data'):
externalData = kwargs['_data']
del kwargs['_data']
# Where must I create the object?
if not isField:
folder = tool.getPath('/data')
else:
if hasattr(self, 'folder') and self.folder:
folder = self.o
else:
folder = self.o.getParentNode()
# Create the object
zopeObj = createObject(folder, objId,portalType, tool.getAppName())
appyObj = zopeObj.appy()
# Set object attributes
for attrName, attrValue in kwargs.iteritems():
setattr(appyObj, attrName, attrValue)
if isField:
# Link the object to this one
appyType.linkObject(self.o, zopeObj)
zopeObj._appy_managePermissions()
# Call custom initialization
if externalData: param = externalData
else: param = True
if hasattr(appyObj, 'onEdit'): appyObj.onEdit(param)
zopeObj.reindex()
return appyObj
def freeze(self, fieldName, doAction=False):
'''This method freezes a POD document. TODO: allow to freeze Computed
fields.'''
rq = self.request
field = self.o.getAppyType(fieldName)
if field.type != 'Pod': raise 'Cannot freeze non-Pod field.'
# Perform the related action if required.
if doAction: self.request.set('askAction', True)
# Set the freeze format
rq.set('podFormat', field.freezeFormat)
# Generate the document.
doc = field.getValue(self.o)
if isinstance(doc, basestring):
self.log(FREEZE_ERROR % (field.freezeFormat, field.name, doc),
type='error')
if field.freezeFormat == 'odt': raise FREEZE_FATAL_ERROR
self.log('Trying to freeze the ODT version...')
# Try to freeze the ODT version of the document, which does not
# require to call OpenOffice/LibreOffice, so the risk of error is
# smaller.
self.request.set('podFormat', 'odt')
doc = field.getValue(self.o)
if isinstance(doc, basestring):
self.log(FREEZE_ERROR % ('odt', field.name, doc), type='error')
raise FREEZE_FATAL_ERROR
field.store(self.o, doc)
def unFreeze(self, fieldName):
'''This method un freezes a POD document. TODO: allow to unfreeze
Computed fields.'''
rq = self.request
field = self.o.getAppyType(fieldName)
if field.type != 'Pod': raise 'Cannot unFreeze non-Pod field.'
field.store(self.o, None)
def delete(self):
'''Deletes myself.'''
self.o.delete()
def translate(self, label, mapping={}, domain=None, language=None,
format='html'):
'''Check documentation of self.o.translate.'''
return self.o.translate(label, mapping, domain, language=language,
format=format)
def do(self, transition, comment='', doAction=True, doNotify=True,
doHistory=True):
'''This method allows to trigger on p_self a workflow p_transition
programmatically. See doc in self.o.do.'''
return self.o.trigger(transition, comment, doAction=doAction,
doNotify=doNotify, doHistory=doHistory, doSay=False)
def log(self, message, type='info'): return self.o.log(message, type)
def say(self, message, type='info'): return self.o.say(message, type)
def normalize(self, s, usage='fileName'):
'''Returns a version of string p_s whose special chars have been
replaced with normal chars.'''
return normalizeString(s, usage)
def search(self, klass, sortBy='', maxResults=None, noSecurity=False,
**fields):
'''Searches objects of p_klass. p_sortBy must be the name of an indexed
field (declared with indexed=True); every param in p_fields must
take the name of an indexed field and take a possible value of this
field. You can optionally specify a maximum number of results in
p_maxResults. If p_noSecurity is specified, you get all objects,
even if the logged user does not have the permission to view it.'''
# Find the content type corresponding to p_klass
tool = self.tool.o
contentType = tool.getPortalType(klass)
# Create the Search object
search = Search('customSearch', sortBy=sortBy, **fields)
if not maxResults:
maxResults = 'NO_LIMIT'
# If I let maxResults=None, only a subset of the results will be
# returned by method executeResult.
res = tool.executeQuery(contentType, search=search,
maxResults=maxResults, noSecurity=noSecurity)
return [o.appy() for o in res['objects']]
def count(self, klass, noSecurity=False, **fields):
'''Identical to m_search above, but returns the number of objects that
match the search instead of returning the objects themselves. Use
this method instead of writing len(self.search(...)).'''
tool = self.tool.o
contentType = tool.getPortalType(klass)
search = Search('customSearch', **fields)
res = tool.executeQuery(contentType, search=search, brainsOnly=True,
noSecurity=noSecurity)
if res: return res._len # It is a LazyMap instance
else: return 0
def compute(self, klass, sortBy='', maxResults=None, context=None,
expression=None, noSecurity=False, **fields):
'''This method, like m_search and m_count above, performs a query on
objects of p_klass. But in this case, instead of returning a list of
matching objects (like m_search) or counting elements (like p_count),
it evaluates, on every matching object, a Python p_expression (which
may be an expression or a statement), and returns, if needed, a
result. The result may be initialized through parameter p_context.
p_expression is evaluated with 2 variables in its context: "obj"
which is the currently walked object, instance of p_klass, and "ctx",
which is the context as initialized (or not) by p_context. p_context
may be used as
(1) a variable or instance that is updated on every call to
produce a result;
(2) an input variable or instance;
(3) both.
The method returns p_context, modified or not by evaluation of
p_expression on every matching object.
When you need to perform an action or computation on a lot of
objects, use this method instead of doing things like
"for obj in self.search(MyClass,...)"
'''
tool = self.tool.o
contentType = tool.getPortalType(klass)
search = Search('customSearch', sortBy=sortBy, **fields)
# Initialize the context variable "ctx"
ctx = context
for brain in tool.executeQuery(contentType, search=search, \
brainsOnly=True, maxResults=maxResults, noSecurity=noSecurity):
# Get the Appy object from the brain
if noSecurity: method = '_unrestrictedGetObject'
else: method = 'getObject'
exec 'obj = brain.%s().appy()' % method
exec expression
return ctx
def reindex(self):
'''Asks a direct object reindexing. In most cases you don't have to
reindex objects "manually" with this method. When an object is
modified after some user action has been performed, Appy reindexes
this object automatically. But if your code modifies other objects,
Appy may not know that they must be reindexed, too. So use this
method in those cases.'''
self.o.reindex()
def export(self, at='string', format='xml', include=None, exclude=None):
'''Creates an "exportable" version of this object. p_format is "xml" by
default, but can also be "csv". If p_format is:
* "xml", if p_at is "string", this method returns the XML version,
without the XML prologue. Else, (a) if not p_at, the XML
will be exported on disk, in the OS temp folder, with an
ugly name; (b) else, it will be exported at path p_at.
* "csv", if p_at is "string", this method returns the CSV data as a
string. If p_at is an opened file handler, the CSV line will
be appended in it.
If p_include is given, only fields whose names are in it will be
included. p_exclude, if given, contains names of fields that will
not be included in the result.
'''
if format == 'xml':
# Todo: take p_include and p_exclude into account.
# Determine where to put the result
toDisk = (at != 'string')
if toDisk and not at:
at = getOsTempFolder() + '/' + self.o.UID() + '.xml'
# Create the XML version of the object
marshaller = XmlMarshaller(cdata=True, dumpUnicode=True,
dumpXmlPrologue=toDisk,
rootTag=self.klass.__name__)
xml = marshaller.marshall(self.o, objectType='appy')
# Produce the desired result
if toDisk:
f = file(at, 'w')
f.write(xml.encode('utf-8'))
f.close()
return at
else:
return xml
elif format == 'csv':
if isinstance(at, basestring):
marshaller = CsvMarshaller(include=include, exclude=exclude)
return marshaller.marshall(self)
else:
marshaller = CsvMarshaller(at, include=include, exclude=exclude)
marshaller.marshall(self)
def historize(self, data):
'''This method allows to add "manually" a "data-change" event into the
object's history. Indeed, data changes are "automatically" recorded
only when an object is edited through the edit form, not when a
setter is called from the code.
p_data must be a dictionary whose keys are field names (strings) and
whose values are the previous field values.'''
self.o.addDataChange(data)
def formatText(self, text, format='html'):
'''Produces a representation of p_text into the desired p_format, which
is 'html' by default.'''
return self.o.formatText(text, format)
# ------------------------------------------------------------------------------