Initial import
This commit is contained in:
commit
4043163fc4
427 changed files with 18387 additions and 0 deletions
10
shared/__init__.py
Executable file
10
shared/__init__.py
Executable file
|
@ -0,0 +1,10 @@
|
|||
# ------------------------------------------------------------------------------
|
||||
import appy
|
||||
import os.path
|
||||
|
||||
appyPath = os.path.realpath(os.path.dirname(appy.__file__))
|
||||
mimeTypes = {'odt': 'application/vnd.oasis.opendocument.text',
|
||||
'doc': 'application/msword',
|
||||
'rtf': 'text/rtf',
|
||||
'pdf': 'application/pdf'}
|
||||
# ------------------------------------------------------------------------------
|
31
shared/errors.py
Executable file
31
shared/errors.py
Executable file
|
@ -0,0 +1,31 @@
|
|||
# ------------------------------------------------------------------------------
|
||||
# Appy is a framework for building applications in the Python language.
|
||||
# Copyright (C) 2007 Gaetan Delannay
|
||||
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License
|
||||
# as published by the Free Software Foundation; either version 2
|
||||
# of the License, or (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,USA.
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
class AppyError(Exception):
|
||||
'''Root Appy exception class.'''
|
||||
pass
|
||||
|
||||
class ValidationError(AppyError):
|
||||
'''Represents an error that occurs on data sent to the Appy server.'''
|
||||
pass
|
||||
|
||||
class InternalError(AppyError):
|
||||
'''Represents a programming error: something that should never occur.'''
|
||||
pass
|
||||
# ------------------------------------------------------------------------------
|
503
shared/rtf.py
Executable file
503
shared/rtf.py
Executable file
|
@ -0,0 +1,503 @@
|
|||
# -*- coding: iso-8859-15 -*-
|
||||
# ------------------------------------------------------------------------------
|
||||
# Appy is a framework for building applications in the Python language.
|
||||
# Copyright (C) 2007 Gaetan Delannay
|
||||
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License
|
||||
# as published by the Free Software Foundation; either version 2
|
||||
# of the License, or (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,USA.
|
||||
|
||||
'''RTF table parser.
|
||||
|
||||
This parser reads RTF documents that conform to the following.
|
||||
- Each table must have a first row with only one cell: the table name.
|
||||
- The other rows must all have the same number of columns. This number must
|
||||
be strictly greater than 1.'''
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
import re, sys, UserList, UserDict
|
||||
from StringIO import StringIO
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class ParserError(Exception): pass
|
||||
class TypeError(Exception): pass
|
||||
|
||||
# ParserError-related constants ------------------------------------------------
|
||||
BAD_PARENT_ROW = 'For table "%s", you specified "%s" as parent ' \
|
||||
'table, but you referred to row number "%s" ' \
|
||||
'within the parent. This value must be a positive ' \
|
||||
'integer or zero (we start counting rows at 0).'
|
||||
PARENT_NOT_FOUND = 'I cannot find table "%s" that you defined as being ' \
|
||||
'parent of "%s".'
|
||||
TABLE_KEY_ERROR = 'Within a row of table "%s", you mention a column named ' \
|
||||
'"%s" which does not exist neither in "%s" itself, ' \
|
||||
'neither in its parent row(s). '
|
||||
PARENT_ROW_NOT_FOUND = 'You specified table "%s" as inheriting from table ' \
|
||||
'"%s", row "%d", but this row does not exist (table ' \
|
||||
'"%s" as a length = %d). Note that we start counting ' \
|
||||
'rows at 0.'
|
||||
PARENT_COLUMN_NOT_FOUND = 'You specified table "%s" as inheriting from table ' \
|
||||
'"%s", column "%s", but this column does not exist ' \
|
||||
'in table "%s" or parents.'
|
||||
PARENT_ROW_COL_NOT_FOUND = 'You specified table "%s" as inheriting from ' \
|
||||
'table "%s", column "%s", value "%s", but it does ' \
|
||||
'not correspond to any row in table "%s".'
|
||||
NO_ROWS_IN_TABLE_YET = 'In first row of table "%s", you use value \' " \' ' \
|
||||
'for referencing the cell value in previous row, ' \
|
||||
'which does not exist.'
|
||||
VALUE_ERROR = 'Value error for column "%s" of table "%s". %s'
|
||||
TYPE_ERROR = 'Type error for column "%s" of table "%s". %s'
|
||||
|
||||
# TypeError-related constants -------------------------------------------------
|
||||
LIST_TYPE_ERROR = 'Maximum number of nested lists is 4.'
|
||||
BASIC_TYPE_ERROR = 'Letter "%s" does not correspond to any valid type. ' \
|
||||
'Valid types are f (float), i (int), g (long) and b (bool).'
|
||||
BASIC_VALUE_ERROR = 'Value "%s" can\'t be converted to type "%s".'
|
||||
LIST_VALUE_ERROR = 'Value "%s" is malformed: within it, %s. You should check ' \
|
||||
'the use of separators ( , : ; - ) to obtain a schema ' \
|
||||
'conform to the type "%s"'
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class Type:
|
||||
basicTypes = {'f': float, 'i':int, 'g':long, 'b':bool}
|
||||
separators = ['-', ';', ',', ':']
|
||||
def __init__(self, typeDecl):
|
||||
self.basicType = None # The python basic type
|
||||
self.listNumber = 0
|
||||
# If = 1 : it is a list. If = 2: it is a list of lists. If = 3...
|
||||
self.analyseTypeDecl(typeDecl)
|
||||
if self.listNumber > 4:
|
||||
raise TypeError(LIST_TYPE_ERROR)
|
||||
self.name = self.computeName()
|
||||
def analyseTypeDecl(self, typeDecl):
|
||||
for char in typeDecl:
|
||||
if char == 'l':
|
||||
self.listNumber += 1
|
||||
else:
|
||||
# Get the basic type
|
||||
if not (char in Type.basicTypes.keys()):
|
||||
raise TypeError(BASIC_TYPE_ERROR % char)
|
||||
self.basicType = Type.basicTypes[char]
|
||||
break
|
||||
if not self.basicType:
|
||||
self.basicType = unicode
|
||||
def convertBasicValue(self, value):
|
||||
try:
|
||||
return self.basicType(value.strip())
|
||||
except ValueError:
|
||||
raise TypeError(BASIC_VALUE_ERROR % (value,
|
||||
self.basicType.__name__))
|
||||
def convertValue(self, value):
|
||||
'''Converts a p_value which is a string into a value conform
|
||||
to self.'''
|
||||
if self.listNumber == 0:
|
||||
res = self.convertBasicValue(value)
|
||||
else:
|
||||
# Get separators in their order of appearance
|
||||
separators = []
|
||||
for char in value:
|
||||
if (char in Type.separators) and (char not in separators):
|
||||
separators.append(char)
|
||||
# Remove surplus separators
|
||||
if len(separators) > self.listNumber:
|
||||
nbOfSurplusSeps = len(separators) - self.listNumber
|
||||
separators = separators[nbOfSurplusSeps:]
|
||||
# If not enough separators, create corresponding empty lists.
|
||||
res = None
|
||||
innerList = None
|
||||
resIsComplete = False
|
||||
if len(separators) < self.listNumber:
|
||||
if not value:
|
||||
res = []
|
||||
resIsComplete = True
|
||||
else:
|
||||
# Begin with empty list(s)
|
||||
nbOfMissingSeps = self.listNumber - len(separators)
|
||||
res = []
|
||||
innerList = res
|
||||
for i in range(nbOfMissingSeps-1):
|
||||
newInnerList = []
|
||||
innerList.append(newInnerList)
|
||||
innerList = newInnerList
|
||||
# We can now convert the value
|
||||
separators.reverse()
|
||||
if innerList != None:
|
||||
innerList.append(self.convertListItem(value, separators))
|
||||
elif not resIsComplete:
|
||||
try:
|
||||
res = self.convertListItem(value, separators)
|
||||
except TypeError, te:
|
||||
raise TypeError(LIST_VALUE_ERROR % (value, te, self.name))
|
||||
return res
|
||||
def convertListItem(self, stringItem, remainingSeps):
|
||||
if not remainingSeps:
|
||||
res = self.convertBasicValue(stringItem)
|
||||
else:
|
||||
curSep = remainingSeps[0]
|
||||
tempRes = stringItem.split(curSep)
|
||||
if (len(tempRes) == 1) and (not tempRes[0]):
|
||||
# There was no value within value, so we produce an empty list.
|
||||
res = []
|
||||
else:
|
||||
res = []
|
||||
for tempItem in tempRes:
|
||||
res.append(self.convertListItem(tempItem,
|
||||
remainingSeps[1:]))
|
||||
return res
|
||||
def computeName(self):
|
||||
prefix = 'list of ' * self.listNumber
|
||||
return '<%s%s>' % (prefix, self.basicType.__name__)
|
||||
def __repr__(self):
|
||||
return self.name
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class Table(UserList.UserList):
|
||||
def __init__(self):
|
||||
UserList.UserList.__init__(self)
|
||||
self.name = None
|
||||
self.parent = None
|
||||
self.parentRow = None
|
||||
# Either ~i~ (the ith row in table self.parent, index starts at 0) or
|
||||
# ~(s_columnName:s_columnValue)~ (identifies the 1st row that have
|
||||
# s_columnValue for the column named s_columnName)
|
||||
def dump(self, withContent=True):
|
||||
res = 'Table "%s"' % self.name
|
||||
if self.parent:
|
||||
res += ' extends table "%s"' % self.parent.name
|
||||
if isinstance(self.parentRow, int):
|
||||
res += '(%d)' % self.parentRow
|
||||
else:
|
||||
res += '(%s=%s)' % self.parentRow
|
||||
if withContent:
|
||||
res += '\n'
|
||||
for line in self:
|
||||
res += str(line)
|
||||
return res
|
||||
def instanceOf(self, tableName):
|
||||
res = False
|
||||
if self.parent:
|
||||
if self.parent.name == tableName:
|
||||
res = True
|
||||
else:
|
||||
res = self.parent.instanceOf(tableName)
|
||||
return res
|
||||
def asDict(self):
|
||||
'''If this table as only 2 columns named "key" and "value", it can be
|
||||
represented as a Python dict. This method produces this dict.'''
|
||||
infoDict = {}
|
||||
if self.parent:
|
||||
for info in self.parent:
|
||||
infoDict[info["key"]] = info["value"]
|
||||
for info in self:
|
||||
infoDict[info["key"]] = info["value"]
|
||||
return infoDict
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class TableRow(UserDict.UserDict):
|
||||
def __init__(self, table):
|
||||
UserDict.UserDict.__init__(self)
|
||||
self.table = table
|
||||
def __getitem__(self, key):
|
||||
'''This method "implements" row inheritance: if the current row does
|
||||
not have an element with p_key, it looks in the parent row of this row,
|
||||
via the parent table self.table.'''
|
||||
keyError = False
|
||||
t = self.table
|
||||
if self.has_key(key):
|
||||
res = UserDict.UserDict.__getitem__(self, key)
|
||||
else:
|
||||
# Get the parent row
|
||||
if t.parent:
|
||||
if isinstance(t.parentRow, int):
|
||||
if t.parentRow < len(t.parent):
|
||||
try:
|
||||
res = t.parent[t.parentRow][key]
|
||||
except KeyError:
|
||||
keyError = True
|
||||
else:
|
||||
raise ParserError(PARENT_ROW_NOT_FOUND %
|
||||
(t.name, t.parent.name, t.parentRow,
|
||||
t.parent.name, len(t.parent)))
|
||||
else:
|
||||
tColumn, tValue = t.parentRow
|
||||
# Get the 1st row having tColumn = tValue
|
||||
rowFound = False
|
||||
for row in t.parent:
|
||||
try:
|
||||
curVal = row[tColumn]
|
||||
except KeyError:
|
||||
raise ParserError(PARENT_COLUMN_NOT_FOUND %
|
||||
(t.name, t.parent.name, tColumn,
|
||||
t.parent.name))
|
||||
if curVal == tValue:
|
||||
rowFound = True
|
||||
try:
|
||||
res = row[key]
|
||||
except KeyError:
|
||||
keyError = True
|
||||
break
|
||||
if not rowFound:
|
||||
raise ParserError(PARENT_ROW_COL_NOT_FOUND %
|
||||
(t.name, t.parent.name, tColumn,
|
||||
tValue, t.parent.name))
|
||||
else:
|
||||
keyError = True
|
||||
if keyError:
|
||||
raise KeyError(TABLE_KEY_ERROR % (t.name, key, t.name))
|
||||
return res
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class NameResolver:
|
||||
def resolveNames(self, tables):
|
||||
for tableName, table in tables.iteritems():
|
||||
if table.parent:
|
||||
if not tables.has_key(table.parent):
|
||||
raise ParserError(PARENT_NOT_FOUND %
|
||||
(table.parent, table.name))
|
||||
table.parent = tables[table.parent]
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class TableParser:
|
||||
# Parser possible states
|
||||
IGNORE = 0
|
||||
READING_CONTROL_WORD = 1
|
||||
READING_CONTENT = 2
|
||||
READING_SPECIAL_CHAR = 3
|
||||
def __init__(self, fileName):
|
||||
self.input = open(fileName)
|
||||
self.state = None
|
||||
# RTF character types
|
||||
self.alpha = re.compile('[a-zA-Z_\-\*]')
|
||||
self.numeric = re.compile('[0-9]')
|
||||
self.whiteSpaces = (' ', '\t', '\n', '\r', '\f', '\v')
|
||||
self.specialChars = {91:"'", 92:"'", 93:'"', 94:'"', 85:'...', 81:'<EFBFBD>',
|
||||
4:'', 5:''}
|
||||
# Parser state
|
||||
self.state = TableParser.READING_CONTENT
|
||||
# Parser buffers
|
||||
self.controlWordBuffer = ''
|
||||
self.contentBuffer = StringIO()
|
||||
self.specialCharBuffer = ''
|
||||
# Resulting RTF output tables
|
||||
self.rtfTables = {}
|
||||
# Attributes needed by onRow and onColumn
|
||||
self.nbOfColumns = 0
|
||||
self.currentRow = []
|
||||
self.previousRow = []
|
||||
self.currentTable = Table()
|
||||
self.currentTableName = None
|
||||
self.currentColumnNames = None # ~[]~
|
||||
self.currentColumnTypes = None # ~[]~
|
||||
self.rowIsHeader = False
|
||||
# Table name regular expression
|
||||
self.tableNameRex = re.compile('([^\(]+)(?:\((.*)\))?')
|
||||
def isGroupDelimiter(self, char):
|
||||
return (char == '{') or (char == '}')
|
||||
def isControlWordStart(self, char):
|
||||
return (char == '\\')
|
||||
def isAlpha(self, char):
|
||||
return self.alpha.match(char)
|
||||
def isNumeric(self, char):
|
||||
return self.numeric.match(char)
|
||||
def isWhiteSpace(self, char):
|
||||
return (char in self.whiteSpaces)
|
||||
def isQuote(self, char):
|
||||
return char == "'"
|
||||
def manageControlWord(self):
|
||||
self.state = TableParser.READING_CONTENT
|
||||
cWord = self.controlWordBuffer
|
||||
if cWord == 'trowd':
|
||||
self.contentBuffer.truncate(0)
|
||||
elif cWord == 'row':
|
||||
self.onRow()
|
||||
self.contentBuffer.truncate(0)
|
||||
elif cWord == 'cell':
|
||||
self.onColumn(self.contentBuffer.getvalue().strip())
|
||||
self.contentBuffer.truncate(0)
|
||||
elif cWord in ('bkmkstart', 'bkmkend'):
|
||||
self.state = TableParser.IGNORE
|
||||
self.controlWordBuffer = ''
|
||||
def manageSpecialChar(self):
|
||||
specialChar = int(self.specialCharBuffer)
|
||||
self.specialCharBuffer = ''
|
||||
if self.specialChars.has_key(specialChar):
|
||||
self.contentBuffer.write(self.specialChars[specialChar])
|
||||
else:
|
||||
print 'Warning: char %d not known.' % specialChar
|
||||
self.state = TableParser.READING_CONTENT
|
||||
def bufferize(self, char):
|
||||
if self.state == TableParser.READING_CONTROL_WORD:
|
||||
self.controlWordBuffer += char
|
||||
elif self.state == TableParser.READING_CONTENT:
|
||||
self.contentBuffer.write(char)
|
||||
elif self.state == TableParser.READING_SPECIAL_CHAR:
|
||||
self.specialCharBuffer += char
|
||||
def parse(self):
|
||||
for line in self.input:
|
||||
for char in line:
|
||||
if self.isGroupDelimiter(char):
|
||||
if self.state == TableParser.READING_SPECIAL_CHAR:
|
||||
self.manageSpecialChar()
|
||||
self.state = TableParser.READING_CONTENT
|
||||
elif self.isControlWordStart(char):
|
||||
if self.state == TableParser.READING_CONTROL_WORD:
|
||||
self.manageControlWord()
|
||||
elif self.state == TableParser.READING_SPECIAL_CHAR:
|
||||
self.manageSpecialChar()
|
||||
self.controlWordBuffer = ''
|
||||
self.state = TableParser.READING_CONTROL_WORD
|
||||
elif self.isAlpha(char):
|
||||
if self.state == TableParser.READING_SPECIAL_CHAR:
|
||||
self.manageSpecialChar()
|
||||
self.bufferize(char)
|
||||
elif self.isNumeric(char):
|
||||
self.bufferize(char)
|
||||
elif self.isWhiteSpace(char):
|
||||
if self.state == TableParser.READING_CONTROL_WORD:
|
||||
self.manageControlWord()
|
||||
elif self.state == TableParser.READING_CONTENT:
|
||||
if char not in ['\n', '\r']:
|
||||
self.contentBuffer.write(char)
|
||||
elif self.state == TableParser.READING_SPECIAL_CHAR:
|
||||
self.manageSpecialChar()
|
||||
if char not in ['\n', '\r']:
|
||||
self.contentBuffer.write(char)
|
||||
elif self.isQuote(char):
|
||||
if (self.state == TableParser.READING_CONTROL_WORD) and \
|
||||
not self.controlWordBuffer:
|
||||
self.state = TableParser.READING_SPECIAL_CHAR
|
||||
elif self.state == TableParser.READING_SPECIAL_CHAR:
|
||||
self.manageSpecialChar()
|
||||
self.bufferize(char)
|
||||
else:
|
||||
self.bufferize(char)
|
||||
else:
|
||||
if self.state == TableParser.READING_CONTENT:
|
||||
self.contentBuffer.write(char)
|
||||
elif self.state == TableParser.READING_SPECIAL_CHAR:
|
||||
self.manageSpecialChar()
|
||||
self.contentBuffer.write(char)
|
||||
if self.controlWordBuffer:
|
||||
self.manageControlWord()
|
||||
if self.currentTableName:
|
||||
self.addTable(self.currentTableName, self.currentTable)
|
||||
return self.rtfTables
|
||||
def getColumnInfos(self, columnHeaders):
|
||||
'''Get, from the column headers, column names and types.'''
|
||||
columnNames = []
|
||||
columnTypes = []
|
||||
for header in columnHeaders:
|
||||
if header.find(':') != -1:
|
||||
# We have a type declaration
|
||||
name, typeDecl = header.split(':')
|
||||
columnNames.append(name.strip())
|
||||
try:
|
||||
columnTypes.append(Type(typeDecl.strip()))
|
||||
except TypeError, te:
|
||||
raise ParserError(TYPE_ERROR %
|
||||
(header, self.currentTableName, te))
|
||||
else:
|
||||
# No type declaration: implicitly it is a string
|
||||
columnNames.append(header)
|
||||
columnTypes.append(None)
|
||||
return columnNames, columnTypes
|
||||
def onRow(self):
|
||||
if (self.nbOfColumns == 0) or not self.currentRow:
|
||||
pass
|
||||
else:
|
||||
if self.rowIsHeader:
|
||||
self.currentColumnNames, self.currentColumnTypes = \
|
||||
self.getColumnInfos(self.currentRow)
|
||||
self.rowIsHeader = False
|
||||
elif self.nbOfColumns == 1:
|
||||
self.rowIsHeader = True
|
||||
if self.currentTableName:
|
||||
self.addTable(self.currentTableName, self.currentTable)
|
||||
self.currentTable = Table()
|
||||
self.currentTableName = self.currentRow[0]
|
||||
else:
|
||||
self.addRow()
|
||||
del self.currentRow[:]
|
||||
self.nbOfColumns = 0
|
||||
def onColumn(self, content):
|
||||
self.currentRow.append(content)
|
||||
self.nbOfColumns += 1
|
||||
def addRow(self):
|
||||
i = 0
|
||||
row = TableRow(self.currentTable)
|
||||
for columnName in self.currentColumnNames:
|
||||
columnValue = self.currentRow[i]
|
||||
if columnValue == '"':
|
||||
if len(self.currentTable) == 0:
|
||||
raise ParserError(
|
||||
NO_ROWS_IN_TABLE_YET % self.currentTableName)
|
||||
else:
|
||||
lastRow = self.currentTable[len(self.currentTable)-1]
|
||||
columnValue = lastRow[columnName]
|
||||
else:
|
||||
columnType = self.currentColumnTypes[i]
|
||||
if columnType:
|
||||
try:
|
||||
columnValue = columnType.convertValue(columnValue)
|
||||
except TypeError, te:
|
||||
raise ParserError(VALUE_ERROR %
|
||||
(columnName, self.currentTableName,
|
||||
te))
|
||||
row[columnName] = columnValue
|
||||
i += 1
|
||||
self.currentTable.append(row)
|
||||
def addTable(self, tableName, table):
|
||||
res = self.tableNameRex.search(tableName)
|
||||
tName, parentSpec = res.groups()
|
||||
table.name = tName
|
||||
if parentSpec:
|
||||
res = parentSpec.split(':')
|
||||
if len(res) == 1:
|
||||
table.parent = parentSpec.strip()
|
||||
table.parentRow = 0
|
||||
else:
|
||||
table.parent = res[0].strip()
|
||||
res = res[1].split('=')
|
||||
if len(res) == 1:
|
||||
try:
|
||||
table.parentRow = int(res[0])
|
||||
except ValueError:
|
||||
raise ParserError(BAD_PARENT_ROW %
|
||||
(table.name, table.parent,
|
||||
res[0]))
|
||||
if table.parentRow < 0:
|
||||
raise ParserError(BAD_PARENT_ROW %
|
||||
(table.name, table.parent,
|
||||
res[0]))
|
||||
else:
|
||||
table.parentRow = (res[0].strip(), res[1].strip())
|
||||
self.rtfTables[table.name] = table
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class RtfTablesParser:
|
||||
def __init__(self, fileName):
|
||||
self.tableParser = TableParser(fileName)
|
||||
self.nameResolver = NameResolver()
|
||||
def parse(self):
|
||||
tables = self.tableParser.parse()
|
||||
self.nameResolver.resolveNames(tables)
|
||||
return tables
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ =='__main__':
|
||||
tables = RtfTablesParser("Tests.rtf").parse()
|
||||
for key, item in tables.iteritems():
|
||||
print 'Table %s' % key
|
||||
print item
|
||||
print
|
||||
# -----------------------------------------------------------------------------
|
401
shared/test.py
Executable file
401
shared/test.py
Executable file
|
@ -0,0 +1,401 @@
|
|||
# ------------------------------------------------------------------------------
|
||||
# Appy is a framework for building applications in the Python language.
|
||||
# Copyright (C) 2007 Gaetan Delannay
|
||||
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License
|
||||
# as published by the Free Software Foundation; either version 2
|
||||
# of the License, or (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,USA.
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
import os, os.path, sys, difflib, time, xml.sax
|
||||
from xml.sax.handler import ContentHandler
|
||||
from optparse import OptionParser
|
||||
from appy.shared.utils import FolderDeleter, Traceback
|
||||
from appy.shared.errors import InternalError
|
||||
from appy.shared.rtf import RtfTablesParser
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
class TesterError(Exception): pass
|
||||
|
||||
# TesterError-related constants
|
||||
WRONG_TEST_PLAN = 'The test plan you specified does not correspond to an ' \
|
||||
'existing RTF file.'
|
||||
_FLAVOUR = 'A flavour represents a test configuration.'
|
||||
FLAVOURS_NOT_LIST = 'The flavours specified must be a list or tuple of ' \
|
||||
'string. ' + _FLAVOUR
|
||||
FLAVOUR_NOT_STRING = 'Each specified flavour must be a string. ' + _FLAVOUR
|
||||
WRONG_TEST_FACTORY = 'You must give a test factory that inherits from the ' \
|
||||
'abstract "appy.shared.test.TestFactory" class.'
|
||||
CREATE_TEST_NOT_OVERRIDDEN = 'The appy.shared.test.TestFactory.createTest ' \
|
||||
'method must be overridden in your concrete ' \
|
||||
'TestFactory.'
|
||||
MAIN_TABLE_NOT_FOUND = 'No table "TestSuites" found in test plan "%s".'
|
||||
MAIN_TABLE_MALFORMED = 'The "TestSuites" table must have at least two ' \
|
||||
'columns, named "Name" and "Description".'
|
||||
TEST_SUITE_NOT_FOUND = 'Table "%s.descriptions" and/or "%s.data" were not ' \
|
||||
'found.'
|
||||
TEST_SUITE_MALFORMED = 'Tables "%s.descriptions" and "%s.data" do not have ' \
|
||||
'the same length. For each test in "%s.data", You ' \
|
||||
'should have one line in "%s.descriptions" describing ' \
|
||||
'the test.'
|
||||
FILE_NOT_FOUND = 'File to compare "%s" was not found.'
|
||||
WRONG_ARGS = 'You must specify as unique argument the configuration flavour ' \
|
||||
'you want, which may be one of %s.'
|
||||
WRONG_FLAVOUR = 'Wrong flavour "%s". Flavour must be one of %s.'
|
||||
|
||||
# InternalError-related constants
|
||||
TEST_REPORT_SINGLETON_ERROR = 'You can only use the TestReport constructor ' \
|
||||
'once. After that you can access the single ' \
|
||||
'TestReport instance via the TestReport.' \
|
||||
'instance static member.'
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
class XmlHandler(ContentHandler):
|
||||
'''This handler is used for producing a readable XML (with carriage returns)
|
||||
and for removing some tags that always change (like dates) from a file
|
||||
that need to be compared to another file.'''
|
||||
def __init__(self, xmlTagsToIgnore, xmlAttrsToIgnore):
|
||||
ContentHandler.__init__(self)
|
||||
self.res = u'<?xml version="1.0" encoding="UTF-8"?>'
|
||||
self.namespaces = {} # ~{s_namespaceUri:s_namespaceName}~
|
||||
self.indentLevel = -1
|
||||
self.tabWidth = 3
|
||||
self.tagsToIgnore = xmlTagsToIgnore
|
||||
self.attrsToIgnore = xmlAttrsToIgnore
|
||||
self.ignoring = False # Some content must be ignored, and not dumped
|
||||
# into the result.
|
||||
def isIgnorable(self, elem):
|
||||
'''Is p_elem an ignorable element ?'''
|
||||
res = False
|
||||
for nsUri, elemName in self.tagsToIgnore:
|
||||
elemFullName = ''
|
||||
try:
|
||||
nsName = self.ns(nsUri)
|
||||
elemFullName = '%s:%s' % (nsName, elemName)
|
||||
except KeyError:
|
||||
pass
|
||||
if elemFullName == elem:
|
||||
res = True
|
||||
break
|
||||
return res
|
||||
def setDocumentLocator(self, locator):
|
||||
self.locator = locator
|
||||
def endDocument(self):
|
||||
pass
|
||||
def dumpSpaces(self):
|
||||
self.res += '\n' + (' ' * self.indentLevel * self.tabWidth)
|
||||
def manageNamespaces(self, attrs):
|
||||
'''Manage namespaces definitions encountered in attrs'''
|
||||
for attrName, attrValue in attrs.items():
|
||||
if attrName.startswith('xmlns:'):
|
||||
self.namespaces[attrValue] = attrName[6:]
|
||||
def ns(self, nsUri):
|
||||
return self.namespaces[nsUri]
|
||||
def startElement(self, elem, attrs):
|
||||
self.manageNamespaces(attrs)
|
||||
# Do we enter into a ignorable element ?
|
||||
if self.isIgnorable(elem):
|
||||
self.ignoring = True
|
||||
else:
|
||||
if not self.ignoring:
|
||||
self.indentLevel += 1
|
||||
self.dumpSpaces()
|
||||
self.res += '<%s' % elem
|
||||
attrsNames = attrs.keys()
|
||||
attrsNames.sort()
|
||||
for attrToIgnore in self.attrsToIgnore:
|
||||
if attrToIgnore in attrsNames:
|
||||
attrsNames.remove(attrToIgnore)
|
||||
for attrName in attrsNames:
|
||||
self.res += ' %s="%s"' % (attrName, attrs[attrName])
|
||||
self.res += '>'
|
||||
def endElement(self, elem):
|
||||
if self.isIgnorable(elem):
|
||||
self.ignoring = False
|
||||
else:
|
||||
if not self.ignoring:
|
||||
self.dumpSpaces()
|
||||
self.indentLevel -= 1
|
||||
self.res += '</%s>' % elem
|
||||
def characters(self, content):
|
||||
if not self.ignoring:
|
||||
self.res += content.replace('\n', '')
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
class TestReport:
|
||||
instance = None
|
||||
def __init__(self, testReportFileName, verbose):
|
||||
if TestReport.instance == None:
|
||||
self.report = open(testReportFileName, 'w')
|
||||
self.verbose = verbose
|
||||
TestReport.instance = self
|
||||
else:
|
||||
raise InternalError(TEST_REPORT_SINGLETON_ERROR)
|
||||
def say(self, msg, force=False, encoding=None):
|
||||
if self.verbose or force:
|
||||
print msg
|
||||
if encoding:
|
||||
self.report.write(msg.encode(encoding))
|
||||
else:
|
||||
self.report.write(msg)
|
||||
self.report.write('\n')
|
||||
def close(self):
|
||||
self.report.close()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
class Test:
|
||||
'''Abstract test class.'''
|
||||
def __init__(self, testData, testDescription, testFolder, config, flavour):
|
||||
self.data = testData
|
||||
self.description = testDescription
|
||||
self.testFolder = testFolder
|
||||
self.tempFolder = None
|
||||
self.report = TestReport.instance
|
||||
self.errorDump = None
|
||||
self.config = config
|
||||
self.flavour = flavour
|
||||
def compareFiles(self, expected, actual, areXml=False, xmlTagsToIgnore=(),
|
||||
xmlAttrsToIgnore=(), encoding=None):
|
||||
'''Compares 2 files. r_ is True if files are different. The differences
|
||||
are written in the test report.'''
|
||||
for f in expected, actual:
|
||||
assert os.path.exists(f), TesterError(FILE_NOT_FOUND % f)
|
||||
# Expected result (may be different according to flavour)
|
||||
if self.flavour:
|
||||
expectedFlavourSpecific = '%s.%s' % (expected, self.flavour)
|
||||
if os.path.exists(expectedFlavourSpecific):
|
||||
expected = expectedFlavourSpecific
|
||||
differ = difflib.Differ()
|
||||
if areXml:
|
||||
f = file(expected)
|
||||
contentA = f.read()
|
||||
f.close()
|
||||
# Actual result
|
||||
f = file(actual)
|
||||
contentB = f.read()
|
||||
f.close()
|
||||
xmlHandler = XmlHandler(xmlTagsToIgnore, xmlAttrsToIgnore)
|
||||
xml.sax.parseString(contentA, xmlHandler)
|
||||
contentA = xmlHandler.res.split('\n')
|
||||
xmlHandler = XmlHandler(xmlTagsToIgnore, xmlAttrsToIgnore)
|
||||
xml.sax.parseString(contentB, xmlHandler)
|
||||
contentB = xmlHandler.res.split('\n')
|
||||
else:
|
||||
f = file(expected)
|
||||
contentA = f.readlines()
|
||||
f.close()
|
||||
# Actual result
|
||||
f = file(actual)
|
||||
contentB = f.readlines()
|
||||
f.close()
|
||||
diffResult = list(differ.compare(contentA, contentB))
|
||||
atLeastOneDiff = False
|
||||
lastLinePrinted = False
|
||||
i = -1
|
||||
for line in diffResult:
|
||||
i += 1
|
||||
if line and (line[0] != ' '):
|
||||
if not atLeastOneDiff:
|
||||
self.report.say('Difference(s) detected between files ' \
|
||||
'%s and %s:' % (expected, actual),
|
||||
encoding='utf-8')
|
||||
atLeastOneDiff = True
|
||||
if not lastLinePrinted:
|
||||
self.report.say('...')
|
||||
if areXml:
|
||||
self.report.say(line, encoding=encoding)
|
||||
else:
|
||||
self.report.say(line[:-1], encoding=encoding)
|
||||
lastLinePrinted = True
|
||||
else:
|
||||
lastLinePrinted = False
|
||||
return atLeastOneDiff
|
||||
def run(self):
|
||||
self.report.say('-' * 79)
|
||||
self.report.say('- Test %s.' % self.data['Name'])
|
||||
self.report.say('- %s\n' % self.description)
|
||||
# Prepare test data
|
||||
self.tempFolder = os.path.join(self.testFolder, 'temp')
|
||||
if os.path.exists(self.tempFolder):
|
||||
time.sleep(0.3) # Sometimes I can't remove it, so I wait
|
||||
FolderDeleter.delete(self.tempFolder)
|
||||
os.mkdir(self.tempFolder)
|
||||
try:
|
||||
self.do()
|
||||
self.report.say('Checking result...')
|
||||
testFailed = self.checkResult()
|
||||
except:
|
||||
testFailed = self.onError()
|
||||
self.finalize()
|
||||
return testFailed
|
||||
def do(self):
|
||||
'''Concrete part of the test. Must be overridden.'''
|
||||
def checkResult(self):
|
||||
'''r_ is False if the test succeeded.'''
|
||||
return True
|
||||
def onError(self):
|
||||
'''What must happen when an exception is raised during test
|
||||
execution? Returns True if the test failed.'''
|
||||
self.errorDump = Traceback.get()
|
||||
self.report.say('Exception occurred:')
|
||||
self.report.say(self.errorDump)
|
||||
return True
|
||||
def finalize(self):
|
||||
'''Performs sme cleaning actions after test execution.'''
|
||||
pass
|
||||
def isExpectedError(self, expectedMessage):
|
||||
'''An exception was thrown. So check if the actual error message
|
||||
(stored in self.errorDump) corresponds to the p_expectedMessage.'''
|
||||
res = True
|
||||
for line in expectedMessage:
|
||||
if (self.errorDump.find(line) == -1):
|
||||
res = False
|
||||
self.report.say('"%s" not found among error dump.' % line)
|
||||
break
|
||||
return res
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
class TestFactory:
|
||||
def createTest(testData, testDescription, testFolder, config, flavour):
|
||||
'''This method allows you to create tests that are instances of classes
|
||||
that you create. Those classes must be children of
|
||||
appy.shared.test.Test. m_createTest must return a Test instance and
|
||||
is called every time a test definition is encountered in the test
|
||||
plan.'''
|
||||
raise TesterError(CREATE_TEST_NOT_OVERRIDDEN)
|
||||
createTest = staticmethod(createTest)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
class Tester:
|
||||
def __init__(self, testPlan, flavours, testFactory):
|
||||
# Check test plan
|
||||
if (not os.path.exists(testPlan)) or (not os.path.isfile(testPlan)) \
|
||||
or (not testPlan.endswith('.rtf')):
|
||||
raise TesterError(WRONG_TEST_PLAN)
|
||||
self.testPlan = testPlan
|
||||
self.testFolder = os.path.abspath(os.path.dirname(testPlan))
|
||||
# Check flavours
|
||||
if (not isinstance(flavours, list)) and \
|
||||
(not isinstance(flavours, tuple)):
|
||||
raise TesterError(FLAVOURS_NOT_LIST)
|
||||
for flavour in flavours:
|
||||
if not isinstance(flavour, basestring):
|
||||
raise TesterError(FLAVOUR_NOT_STRING)
|
||||
self.flavours = flavours
|
||||
self.flavour = None
|
||||
# Check test factory
|
||||
if not issubclass(testFactory, TestFactory):
|
||||
raise TesterError(WRONG_TEST_FACTORY)
|
||||
self.testFactory = testFactory
|
||||
self.getOptions()
|
||||
self.report = TestReport('%s/Tester.report.txt' % self.testFolder,
|
||||
self.verbose)
|
||||
self.report.say('Parsing RTF file... ')
|
||||
t1 = time.time()
|
||||
self.tables = RtfTablesParser(testPlan).parse()
|
||||
t2 = time.time() - t1
|
||||
self.report.say('Done in %d seconds' % t2)
|
||||
self.config = None
|
||||
ext = ''
|
||||
if self.flavour:
|
||||
ext = '.%s' % self.flavour
|
||||
configTableName = 'Configuration%s' % ext
|
||||
if self.tables.has_key(configTableName):
|
||||
self.config = self.tables[configTableName].asDict()
|
||||
self.tempFolder = os.path.join(self.testFolder, 'temp')
|
||||
if os.path.exists(self.tempFolder):
|
||||
FolderDeleter.delete(self.tempFolder)
|
||||
self.nbOfTests = 0
|
||||
self.nbOfSuccesses = 0
|
||||
self.nbOfIgnoredTests = 0
|
||||
def getOptions(self):
|
||||
optParser = OptionParser()
|
||||
optParser.add_option("-v", "--verbose", action="store_true",
|
||||
help="Dumps the whole test report on stdout")
|
||||
optParser.add_option("-k", "--keepTemp", action="store_true", help = \
|
||||
"Keep the temp folder, in order to be able to " \
|
||||
"copy some results and make them expected " \
|
||||
"results when needed.")
|
||||
(options, args) = optParser.parse_args()
|
||||
if self.flavours:
|
||||
if len(args) != 1:
|
||||
raise TesterError(WRONG_ARGS % self.flavours)
|
||||
self.flavour = args[0]
|
||||
if not self.flavour in self.flavours:
|
||||
raise TesterError(WRONG_FLAVOUR % (self.flavour, self.flavours))
|
||||
self.verbose = options.verbose == True
|
||||
self.keepTemp = options.keepTemp == True
|
||||
def runSuite(self, suite):
|
||||
self.report.say('*' * 79)
|
||||
self.report.say('* Suite %s.' % suite['Name'])
|
||||
self.report.say('* %s\n' % suite['Description'])
|
||||
i = -1
|
||||
for testData in self.tables['%s.data' % suite['Name']]:
|
||||
self.nbOfTests += 1
|
||||
i += 1
|
||||
if testData['Name'].startswith('_'):
|
||||
self.nbOfIgnoredTests += 1
|
||||
else:
|
||||
description = self.tables['%s.descriptions' % \
|
||||
suite['Name']][i]['Description']
|
||||
test = self.testFactory.createTest(
|
||||
testData, description, self.testFolder, self.config,
|
||||
self.flavour)
|
||||
testFailed = test.run()
|
||||
if not self.verbose:
|
||||
sys.stdout.write('.')
|
||||
sys.stdout.flush()
|
||||
if testFailed:
|
||||
self.report.say('Test failed.\n')
|
||||
else:
|
||||
self.report.say('Test successful.\n')
|
||||
self.nbOfSuccesses += 1
|
||||
def run(self):
|
||||
assert self.tables.has_key('TestSuites'), \
|
||||
TesterError(MAIN_TABLE_NOT_FOUND % self.testPlan)
|
||||
for testSuite in self.tables['TestSuites']:
|
||||
if (not testSuite.has_key('Name')) or \
|
||||
(not testSuite.has_key('Description')):
|
||||
raise TesterError(MAIN_TABLE_MALFORMED)
|
||||
if testSuite['Name'].startswith('_'):
|
||||
tsName = testSuite['Name'][1:]
|
||||
tsIgnored = True
|
||||
else:
|
||||
tsName = testSuite['Name']
|
||||
tsIgnored = False
|
||||
assert self.tables.has_key('%s.descriptions' % tsName) \
|
||||
and self.tables.has_key('%s.data' % tsName), \
|
||||
TesterError(TEST_SUITE_NOT_FOUND % (tsName, tsName))
|
||||
assert len(self.tables['%s.descriptions' % tsName]) == \
|
||||
len(self.tables['%s.data' % tsName]), \
|
||||
TesterError(TEST_SUITE_MALFORMED % ((tsName,)*4))
|
||||
if tsIgnored:
|
||||
nbOfIgnoredTests = len(self.tables['%s.data' % tsName])
|
||||
self.nbOfIgnoredTests += nbOfIgnoredTests
|
||||
self.nbOfTests += nbOfIgnoredTests
|
||||
else:
|
||||
self.runSuite(testSuite)
|
||||
self.finalize()
|
||||
def finalize(self):
|
||||
msg = '%d/%d successful test(s)' % \
|
||||
(self.nbOfSuccesses, (self.nbOfTests-self.nbOfIgnoredTests))
|
||||
if self.nbOfIgnoredTests >0:
|
||||
msg += ', but %d ignored test(s) not counted' % \
|
||||
self.nbOfIgnoredTests
|
||||
msg += '.'
|
||||
self.report.say(msg, force=True)
|
||||
self.report.close()
|
||||
if not self.keepTemp:
|
||||
if os.path.exists(self.tempFolder):
|
||||
FolderDeleter.delete(self.tempFolder)
|
||||
# ------------------------------------------------------------------------------
|
48
shared/utils.py
Executable file
48
shared/utils.py
Executable file
|
@ -0,0 +1,48 @@
|
|||
# ------------------------------------------------------------------------------
|
||||
# Appy is a framework for building applications in the Python language.
|
||||
# Copyright (C) 2007 Gaetan Delannay
|
||||
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License
|
||||
# as published by the Free Software Foundation; either version 2
|
||||
# of the License, or (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,USA.
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
import os, os.path, sys, traceback
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
class FolderDeleter:
|
||||
def delete(dirName):
|
||||
'''Recursively deletes p_dirName.'''
|
||||
dirName = os.path.abspath(dirName)
|
||||
for root, dirs, files in os.walk(dirName, topdown=False):
|
||||
for name in files:
|
||||
os.remove(os.path.join(root, name))
|
||||
for name in dirs:
|
||||
os.rmdir(os.path.join(root, name))
|
||||
os.rmdir(dirName)
|
||||
delete = staticmethod(delete)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
class Traceback:
|
||||
'''Dumps the last traceback into a string.'''
|
||||
def get():
|
||||
res = ''
|
||||
excType, excValue, tb = sys.exc_info()
|
||||
tbLines = traceback.format_tb(tb)
|
||||
for tbLine in tbLines:
|
||||
res += ' %s' % tbLine
|
||||
res += ' %s: %s' % (str(excType), str(excValue))
|
||||
return res
|
||||
get = staticmethod(get)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
1
shared/version.txt
Executable file
1
shared/version.txt
Executable file
|
@ -0,0 +1 @@
|
|||
0.2dev unreleased
|
130
shared/xml_parser.py
Executable file
130
shared/xml_parser.py
Executable file
|
@ -0,0 +1,130 @@
|
|||
# ------------------------------------------------------------------------------
|
||||
# Appy is a framework for building applications in the Python language.
|
||||
# Copyright (C) 2007 Gaetan Delannay
|
||||
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License
|
||||
# as published by the Free Software Foundation; either version 2
|
||||
# of the License, or (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,USA.
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
import xml.sax
|
||||
from xml.sax.handler import ContentHandler, ErrorHandler
|
||||
from xml.sax.xmlreader import InputSource
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
class XmlElement:
|
||||
'''Representgs an XML tag.'''
|
||||
def __init__(self, elem, attrs=None, nsUri=None):
|
||||
'''An XmlElement instance may represent:
|
||||
- an already parsed tag (in this case, p_elem may be prefixed with a
|
||||
namespace);
|
||||
- the definition of an XML element (in this case, no namespace can be
|
||||
found in p_elem; but a namespace URI may be defined in p_nsUri).'''
|
||||
self.elem = elem
|
||||
self.attrs = attrs
|
||||
if elem.find(':') != -1:
|
||||
self.ns, self.name = elem.split(':')
|
||||
else:
|
||||
self.ns = ''
|
||||
self.name = elem
|
||||
self.nsUri = nsUri
|
||||
def equalsTo(self, other, namespaces=None):
|
||||
'''Does p_elem == p_other? If a p_namespaces dict is given, p_other must
|
||||
define a nsUri.'''
|
||||
res = None
|
||||
if namespaces:
|
||||
res = self.elem == ('%s:%s' % (namespaces[other.nsUri], other.name))
|
||||
else:
|
||||
res = self.elem == other.elem
|
||||
return res
|
||||
def __repr__(self):
|
||||
res = self.elem
|
||||
if self.attrs:
|
||||
res += '('
|
||||
for attrName, attrValue in self.attrs.items():
|
||||
res += '%s="%s"' % (attrName, attrValue)
|
||||
res += ')'
|
||||
return res
|
||||
def getFullName(self, namespaces=None):
|
||||
'''Gets the name of the element including the namespace prefix.'''
|
||||
if not namespaces:
|
||||
res = self.elem
|
||||
else:
|
||||
res = '%s:%s' % (namespaces[self.nsUri], self.name)
|
||||
return res
|
||||
|
||||
class XmlEnvironment:
|
||||
'''An XML environment remembers a series of elements during a SAX parsing.
|
||||
This class is an abstract class that gathers basic things like
|
||||
namespaces.'''
|
||||
def __init__(self):
|
||||
# This dict contains the xml namespace declarations encountered so far
|
||||
self.namespaces = {} # ~{s_namespaceUri:s_namespaceName}~
|
||||
self.currentElem = None # The currently parsed element
|
||||
self.parser = None
|
||||
def manageNamespaces(self, attrs):
|
||||
'''Manages namespaces definitions encountered in p_attrs.'''
|
||||
for attrName, attrValue in attrs.items():
|
||||
if attrName.startswith('xmlns:'):
|
||||
self.namespaces[attrValue] = attrName[6:]
|
||||
def ns(self, nsUri):
|
||||
'''Returns the namespace corresponding to o_nsUri.'''
|
||||
return self.namespaces[nsUri]
|
||||
|
||||
class XmlParser(ContentHandler, ErrorHandler):
|
||||
'''Basic XML content handler that does things like :
|
||||
- remembering the currently parsed element;
|
||||
- managing namespace declarations.'''
|
||||
def __init__(self, env, caller=None):
|
||||
'''p_env should be an instance of a class that inherits from
|
||||
XmlEnvironment: it specifies the environment to use for this SAX
|
||||
parser.'''
|
||||
ContentHandler.__init__(self)
|
||||
self.env = env
|
||||
self.env.parser = self
|
||||
self.caller = caller # The class calling this parser
|
||||
self.parser = xml.sax.make_parser() # Fast, standard expat parser
|
||||
def setDocumentLocator(self, locator):
|
||||
self.locator = locator
|
||||
return self.env
|
||||
def endDocument(self):
|
||||
return self.env
|
||||
def startElement(self, elem, attrs):
|
||||
self.env.manageNamespaces(attrs)
|
||||
if self.env.currentElem == None:
|
||||
self.env.currentElem = XmlElement(elem, attrs=attrs)
|
||||
else:
|
||||
# Reuse the exiting instance in order to avoid creating one instance
|
||||
# every time an elem is met in the XML file.
|
||||
self.env.currentElem.__init__(elem, attrs)
|
||||
return self.env
|
||||
def endElement(self, elem):
|
||||
self.env.currentElem.__init__(elem)
|
||||
return self.env
|
||||
def characters(self, content):
|
||||
return self.env
|
||||
def parse(self, xmlContent, source='string'):
|
||||
'''Parsers the XML file or string p_xmlContent.'''
|
||||
try:
|
||||
from cStringIO import StringIO
|
||||
except ImportError:
|
||||
from StringIO import StringIO
|
||||
self.parser.setContentHandler(self)
|
||||
self.parser.setErrorHandler(self)
|
||||
inputSource = InputSource()
|
||||
if source == 'string':
|
||||
inputSource.setByteStream(StringIO(xmlContent))
|
||||
else:
|
||||
inputSource.setByteStream(xmlContent)
|
||||
self.parser.parse(inputSource)
|
||||
# ------------------------------------------------------------------------------
|
Loading…
Add table
Add a link
Reference in a new issue