appypod-rattail/gen/plone25/wrappers/__init__.py

460 lines
21 KiB
Python
Raw Normal View History

2009-06-29 07:06:01 -05:00
'''This package contains base classes for wrappers that hide to the Appy
developer the real classes used by the underlying web framework.'''
2009-06-29 07:06:01 -05:00
# ------------------------------------------------------------------------------
import os, os.path, time, mimetypes, random
import appy.pod
from appy.gen import Search
from appy.gen.utils import sequenceTypes
from appy.shared.utils import getOsTempFolder, executeCommand, normalizeString
from appy.shared.xml_parser import XmlMarshaller
# Some error messages ----------------------------------------------------------
WRONG_FILE_TUPLE = 'This is not the way to set a file. You can specify a ' \
'2-tuple (fileName, fileContent) or a 3-tuple (fileName, fileContent, ' \
'mimeType).'
2009-06-29 07:06:01 -05:00
# ------------------------------------------------------------------------------
class AbstractWrapper:
'''Any real web framework object has a companion object that is an instance
of this class.'''
def __init__(self, o):
self.__dict__['o'] = o
def _set_file_attribute(self, name, v):
'''Updates the value of a file attribute named p_name with value p_v.
p_v may be:
- a string value containing the path to a file on disk;
- a 2-tuple (fileName, fileContent) where
* fileName = the name of the file (ie "myFile.odt")
* fileContent = the binary or textual content of the file or an
open file handler.
- a 3-tuple (fileName, fileContent, mimeType) where mimeType is the
v MIME type of the file.'''
ploneFileClass = self.o.getProductConfig().File
if isinstance(v, ploneFileClass):
exec "self.o.set%s%s(v)" % (name[0].upper(), name[1:])
elif isinstance(v, FileWrapper):
setattr(self, name, v._atFile)
elif isinstance(v, basestring):
f = file(v)
fileName = os.path.basename(v)
fileId = 'file.%f' % time.time()
ploneFile = ploneFileClass(fileId, fileName, f)
ploneFile.filename = fileName
ploneFile.content_type = mimetypes.guess_type(fileName)[0]
setattr(self, name, ploneFile)
f.close()
elif type(v) in sequenceTypes:
# It should be a 2-tuple or 3-tuple
fileName = None
mimeType = None
if len(v) == 2:
fileName, fileContent = v
elif len(v) == 3:
fileName, fileContent, mimeType = v
else:
raise WRONG_FILE_TUPLE
if fileName:
fileId = 'file.%f' % time.time()
ploneFile = ploneFileClass(fileId, fileName, fileContent)
ploneFile.filename = fileName
if not mimeType:
mimeType = mimetypes.guess_type(fileName)[0]
ploneFile.content_type = mimeType
setattr(self, name, ploneFile)
2009-06-29 07:06:01 -05:00
def __setattr__(self, name, v):
appyType = self.o.getAppyType(name)
if not appyType and (name != 'title'):
raise 'Attribute "%s" does not exist.' % name
if appyType and (appyType['type'] == 'File'):
self._set_file_attribute(name, v)
else:
exec "self.o.set%s%s(v)" % (name[0].upper(), name[1:])
def __repr__(self):
return '<%s wrapper at %s>' % (self.klass.__name__, id(self))
2009-06-29 07:06:01 -05:00
def __cmp__(self, other):
if other: return cmp(self.o, other.o)
else: return 1
def get_tool(self): return self.o.getTool().appy()
2009-06-29 07:06:01 -05:00
tool = property(get_tool)
def get_flavour(self): return self.o.getTool().getFlavour(self.o, appy=True)
flavour = property(get_flavour)
def get_request(self): return self.o.REQUEST
request = property(get_request)
def get_session(self): return self.o.REQUEST.SESSION
2009-06-29 07:06:01 -05:00
session = property(get_session)
def get_typeName(self): return self.__class__.__bases__[-1].__name__
2009-06-29 07:06:01 -05:00
typeName = property(get_typeName)
def get_id(self): return self.o.id
2009-06-29 07:06:01 -05:00
id = property(get_id)
def get_state(self):
return self.o.portal_workflow.getInfoFor(self.o, 'review_state')
state = property(get_state)
def get_stateLabel(self):
appName = self.o.getProductConfig().PROJECTNAME
return self.o.utranslate(self.o.getWorkflowLabel(), domain=appName)
stateLabel = property(get_stateLabel)
def get_klass(self): return self.__class__.__bases__[1]
2009-06-29 07:06:01 -05:00
klass = property(get_klass)
def get_url(self): return self.o.absolute_url()+'/skyn/view'
url = property(get_url)
def get_history(self):
key = self.o.workflow_history.keys()[0]
return self.o.workflow_history[key]
history = property(get_history)
def get_user(self):
return self.o.portal_membership.getAuthenticatedMember()
user = property(get_user)
2009-06-29 07:06:01 -05:00
def link(self, fieldName, obj):
'''This method links p_obj to this one through reference field
p_fieldName.'''
if isinstance(obj, AbstractWrapper):
obj = obj.o
postfix = 'et%s%s' % (fieldName[0].upper(), fieldName[1:])
# Update the Archetypes reference field
exec 'objs = self.o.g%s()' % postfix
if not objs:
objs = []
elif type(objs) not in (list, tuple):
objs = [objs]
objs.append(obj)
exec 'self.o.s%s(objs)' % postfix
# Update the ordered list of references
2010-02-12 03:59:42 -06:00
self.o._appy_getSortedField(fieldName).append(obj.UID())
2009-06-29 07:06:01 -05:00
def sort(self, fieldName, sortKey='title', reverse=False):
'''Sorts referred elements linked to p_self via p_fieldName according
to a given p_sortKey which must be an attribute set on referred
objects ("title", by default).'''
sortedUids = getattr(self.o, '_appy_%s' % fieldName)
c = self.o.uid_catalog
sortedUids.sort(lambda x,y: \
cmp(getattr(c(UID=x)[0].getObject().appy(), sortKey),
getattr(c(UID=y)[0].getObject().appy(), sortKey)))
if reverse:
sortedUids.reverse()
def create(self, fieldNameOrClass, **kwargs):
'''If p_fieldNameOfClass is the name of a field, this method allows to
create an object and link it to the current one (self) through
reference field named p_fieldName.
If p_fieldNameOrClass is a class from the gen-application, it must
correspond to a root class and this method allows to create a
root object in the application folder.'''
isField = isinstance(fieldNameOrClass, basestring)
# Determine the portal type of the object to create
if isField:
fieldName = fieldNameOrClass
idPrefix = fieldName
portalType = self.o.getAppyRefPortalType(fieldName)
else:
theClass = fieldNameOrClass
idPrefix = theClass.__name__
portalType = self.o._appy_getAtType(theClass, self.flavour.o)
# Determine object id
2009-06-29 07:06:01 -05:00
if kwargs.has_key('id'):
objId = kwargs['id']
del kwargs['id']
else:
objId = '%s.%f.%s' % (idPrefix, time.time(),
str(random.random()).split('.')[1])
# Determine if object must be created from external data
externalData = None
if kwargs.has_key('_data'):
externalData = kwargs['_data']
del kwargs['_data']
# Where must I create the object?
if not isField:
folder = self.o.getTool().getAppFolder()
2009-06-29 07:06:01 -05:00
else:
if hasattr(self, 'folder') and self.folder:
folder = self.o
else:
folder = self.o.getParentNode()
2009-06-29 07:06:01 -05:00
# Create the object
folder.invokeFactory(portalType, objId)
ploneObj = getattr(folder, objId)
appyObj = ploneObj.appy()
2009-06-29 07:06:01 -05:00
# Set object attributes
for attrName, attrValue in kwargs.iteritems():
setterName = 'set%s%s' % (attrName[0].upper(), attrName[1:])
if isinstance(attrValue, AbstractWrapper):
try:
refAppyType = getattr(appyObj.__class__.__bases__[-1],
attrName)
appyObj.link(attrName, attrValue.o)
except AttributeError, ae:
pass
else:
getattr(ploneObj, setterName)(attrValue)
if isField:
# Link the object to this one
self.link(fieldName, ploneObj)
self.o.reindexObject()
# Call custom initialization
if externalData: param = externalData
else: param = True
if hasattr(appyObj, 'onEdit'): appyObj.onEdit(param)
2009-06-29 07:06:01 -05:00
ploneObj.reindexObject()
return appyObj
def translate(self, label, mapping={}, domain=None):
'''Check documentation of self.o.translate.'''
return self.o.translate(label, mapping, domain)
2009-06-29 07:06:01 -05:00
def do(self, transition, comment='', doAction=False, doNotify=False,
doHistory=True):
2009-06-29 07:06:01 -05:00
'''This method allows to trigger on p_self a workflow p_transition
programmatically. If p_doAction is False, the action that must
normally be executed after the transition has been triggered will
not be executed. If p_doNotify is False, the notifications
(email,...) that must normally be launched after the transition has
been triggered will not be launched. If p_doHistory is False, there
will be no trace from this transition triggering in the workflow
history.'''
2009-06-29 07:06:01 -05:00
wfTool = self.o.portal_workflow
availableTransitions = [t['id'] for t in \
wfTool.getTransitionsFor(self.o)]
transitionName = transition
if not transitionName in availableTransitions:
# Maybe is is a compound Appy transition. Try to find the
# corresponding DC transition.
state = self.state
transitionPrefix = transition + state[0].upper() + state[1:] + 'To'
for at in availableTransitions:
if at.startswith(transitionPrefix):
transitionName = at
break
# Set in a versatile attribute details about what to execute or not
# (actions, notifications) after the transition has been executed by DC
# workflow.
self.o._v_appy_do = {'doAction': doAction, 'doNotify': doNotify}
if not doHistory:
comment = '_invisible_' # Will not be displayed.
# At first sight, I wanted to remove the entry from
# self.o.workflow_history. But Plone determines the state of an
# object by consulting the target state of the last transition in
# this workflow_history.
2009-06-29 07:06:01 -05:00
wfTool.doActionFor(self.o, transitionName, comment=comment)
del self.o._v_appy_do
def log(self, message, type='info'):
'''Logs a message in the log file. p_logLevel may be "info", "warning"
or "error".'''
logger = self.o.getProductConfig().logger
if type == 'warning': logMethod = logger.warn
elif type == 'error': logMethod = logger.error
else: logMethod = logger.info
logMethod(message)
def say(self, message, type='info'):
'''Prints a message in the user interface. p_logLevel may be "info",
"warning" or "error".'''
mType = type
if mType == 'warning': mType = 'warn'
elif mType == 'error': mType = 'stop'
self.o.plone_utils.addPortalMessage(message, type=mType)
2010-02-12 03:59:42 -06:00
unwantedChars = ('\\', '/', ':', '*', '?', '"', '<', '>', '|', ' ')
def normalize(self, s, usage='fileName'):
'''Returns a version of string p_s whose special chars have been
replaced with normal chars.'''
return normalizeString(s, usage)
2010-02-22 08:28:20 -06:00
def search(self, klass, sortBy='', maxResults=None, noSecurity=False,
**fields):
'''Searches objects of p_klass. p_sortBy must be the name of an indexed
field (declared with indexed=True); every param in p_fields must
take the name of an indexed field and take a possible value of this
field. You can optionally specify a maximum number of results in
p_maxResults. If p_noSecurity is specified, you get all objects,
even if the logged user does not have the permission to view it.'''
# Find the content type corresponding to p_klass
flavour = self.flavour
contentType = flavour.o.getPortalType(klass)
# Create the Search object
search = Search('customSearch', sortBy=sortBy, **fields)
if not maxResults:
maxResults = 'NO_LIMIT'
# If I let maxResults=None, only a subset of the results will be
# returned by method executeResult.
res = self.tool.o.executeQuery(contentType,flavour.number,search=search,
maxResults=maxResults, noSecurity=noSecurity)
return [o.appy() for o in res['objects']]
def count(self, klass, noSecurity=False, **fields):
'''Identical to m_search above, but returns the number of objects that
match the search instead of returning the objects themselves. Use
this method instead of writing len(self.search(...)).'''
flavour = self.flavour
contentType = flavour.o.getPortalType(klass)
search = Search('customSearch', **fields)
res = self.tool.o.executeQuery(contentType,flavour.number,search=search,
brainsOnly=True, noSecurity=noSecurity)
if res: return res._len # It is a LazyMap instance
else: return 0
2010-02-22 08:28:20 -06:00
def compute(self, klass, sortBy='', maxResults=None, context=None,
expression=None, noSecurity=False, **fields):
'''This method, like m_search and m_count above, performs a query on
objects of p_klass. But in this case, instead of returning a list of
matching objects (like m_search) or counting elements (like p_count),
it evaluates, on every matching object, a Python p_expression (which
may be an expression or a statement), and returns, if needed, a
result. The result may be initialized through parameter p_context.
p_expression is evaluated with 2 variables in its context: "obj"
which is the currently walked object, instance of p_klass, and "ctx",
which is the context as initialized (or not) by p_context. p_context
may be used as
2010-02-22 08:28:20 -06:00
(1) a variable or instance that is updated on every call to
produce a result;
(2) an input variable or instance;
(3) both.
The method returns p_context, modified or not by evaluation of
p_expression on every matching object.
When you need to perform an action or computation on a lot of
objects, use this method instead of doing things like
"for obj in self.search(MyClass,...)"
'''
flavour = self.flavour
contentType = flavour.o.getPortalType(klass)
2010-02-22 08:28:20 -06:00
search = Search('customSearch', sortBy=sortBy, **fields)
# Initialize the context variable "ctx"
ctx = context
for brain in self.tool.o.executeQuery(contentType, flavour.number, \
2010-02-22 08:28:20 -06:00
search=search, brainsOnly=True, maxResults=maxResults,
noSecurity=noSecurity):
# Get the Appy object from the brain
obj = brain.getObject().appy()
exec expression
return ctx
def reindex(self):
'''Asks a direct object reindexing. In most cases you don't have to
reindex objects "manually" with this method. When an object is
modified after some user action has been performed, Appy reindexes
this object automatically. But if your code modifies other objects,
Appy may not know that they must be reindexed, too. So use this
method in those cases.'''
self.o.reindexObject()
def export(self, at='string'):
'''Creates an "exportable", XML version of this object. If p_at is
2010-03-31 08:49:54 -05:00
"string", this method returns the XML version, without the XML
prologue. Else, (a) if not p_at, the XML will be exported on disk,
in the OS temp folder, with an ugly name; (b) else, it will be
exported at path p_at.'''
# Determine where to put the result
toDisk = (at != 'string')
if toDisk and not at:
at = getOsTempFolder() + '/' + self.o.UID() + '.xml'
# Create the XML version of the object
2010-03-31 08:49:54 -05:00
marshaller = XmlMarshaller(cdata=True, dumpUnicode=True,
dumpXmlPrologue=toDisk,
rootTag=self.klass.__name__)
xml = marshaller.marshall(self.o, objectType='appy')
# Produce the desired result
if toDisk:
f = file(at, 'w')
f.write(xml.encode('utf-8'))
f.close()
return at
else:
return xml
def historize(self, data):
'''This method allows to add "manually" a "data-change" event into the
object's history. Indeed, data changes are "automatically" recorded
only when an object is edited through the edit form, not when a
setter is called from the code.
p_data must be a dictionary whose keys are field names (strings) and
whose values are the previous field values.'''
self.o.addDataChange(data, labels=False)
# ------------------------------------------------------------------------------
CONVERSION_ERROR = 'An error occurred while executing command "%s". %s'
class FileWrapper:
'''When you get, from an appy object, the value of a File attribute, you
get an instance of this class.'''
def __init__(self, atFile):
'''This constructor is only used by Appy to create a nice File instance
from a Plone/Zope corresponding instance (p_atFile). If you need to
create a new file and assign it to a File attribute, use the
attribute setter, do not create yourself an instance of this
class.'''
d = self.__dict__
d['_atFile'] = atFile # Not for you!
d['name'] = atFile.filename
d['content'] = atFile.data
d['mimeType'] = atFile.content_type
d['size'] = atFile.size # In bytes
def __setattr__(self, name, v):
d = self.__dict__
if name == 'name':
self._atFile.filename = v
d['name'] = v
elif name == 'content':
self._atFile.update_data(v, self.mimeType, len(v))
d['content'] = v
d['size'] = len(v)
elif name == 'mimeType':
self._atFile.content_type = self.mimeType = v
else:
raise 'Impossible to set attribute %s. "Settable" attributes ' \
'are "name", "content" and "mimeType".' % name
def dump(self, filePath=None, format=None, tool=None):
'''Writes the file on disk. If p_filePath is specified, it is the
path name where the file will be dumped; folders mentioned in it
must exist. If not, the file will be dumped in the OS temp folder.
The absolute path name of the dumped file is returned.
If an error occurs, the method returns None. If p_format is
specified, OpenOffice will be called for converting the dumped file
to the desired format. In this case, p_tool, a Appy tool, must be
provided. Indeed, any Appy tool contains parameters for contacting
OpenOffice in server mode.'''
if not filePath:
filePath = '%s/file%f.%s' % (getOsTempFolder(), time.time(),
self.name)
f = file(filePath, 'w')
if self.content.__class__.__name__ == 'Pdata':
# The file content is splitted in several chunks.
f.write(self.content.data)
nextPart = self.content.next
while nextPart:
f.write(nextPart.data)
nextPart = nextPart.next
else:
# Only one chunk
f.write(self.content)
f.close()
if format:
if not tool: return
# Convert the dumped file using OpenOffice
convScript = '%s/converter.py' % os.path.dirname(appy.pod.__file__)
cmd = '%s %s "%s" %s -p%d' % (tool.unoEnabledPython, convScript,
filePath, format, tool.openOfficePort)
errorMessage = executeCommand(cmd)
# Even if we have an "error" message, it could be a simple warning.
# So we will continue here and, as a subsequent check for knowing if
# an error occurred or not, we will test the existence of the
# converted file (see below).
os.remove(filePath)
# Return the name of the converted file.
baseName, ext = os.path.splitext(filePath)
if (ext == '.%s' % format):
filePath = '%s.res.%s' % (baseName, format)
else:
filePath = '%s.%s' % (baseName, format)
if not os.path.exists(filePath):
tool.log(CONVERSION_ERROR % (cmd, errorMessage), type='error')
return
return filePath
2009-06-29 07:06:01 -05:00
# ------------------------------------------------------------------------------