sqlbase7-sa/sqlbase7_sa/sqlbase7.py

158 lines
5.8 KiB
Python
Raw Normal View History

2010-04-19 10:29:37 -05:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
################################################################################
#
# SQLBase7-SA -- SQLAlchemy driver/dialect for Centura SQLBase v7
2010-04-19 10:29:37 -05:00
# Copyright © 2010 Lance Edgar
#
# This file is part of SQLBase7-SA.
2010-04-19 10:29:37 -05:00
#
# SQLBase7-SA is free software: you can redistribute it and/or modify
2010-04-19 10:29:37 -05:00
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# SQLBase7-SA is distributed in the hope that it will be useful,
2010-04-19 10:29:37 -05:00
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with SQLBase7-SA. If not, see <http://www.gnu.org/licenses/>.
2010-04-19 10:29:37 -05:00
#
################################################################################
from sqlalchemy.engine.default import DefaultDialect
from sqlalchemy import types, and_
from sqlalchemy.sql.expression import Join
from sqlalchemy.sql import visitors, operators, ClauseElement
2010-04-19 10:29:37 -05:00
import sqlalchemy
from pkg_resources import parse_version
if parse_version(sqlalchemy.__version__) <= parse_version('0.5.99'):
from sqlalchemy.sql.compiler import DefaultCompiler as CompilerBase
else:
from sqlalchemy.sql.compiler import SQLCompiler as CompilerBase
class LimitClauseNotSupported(Exception):
def __init__(self, limit, offset):
self.limit = limit
self.offset = offset
def __str__(self):
return "Centura SQLBase 7.5.1 doesn't support a LIMIT clause for the SELECT statement (received: limit = %u, offset = %u)" % (self.limit, self.offset)
class SQLBase7Compiler(CompilerBase):
# Most of the code below was copied from the Oracle dialect. Thanks to Michael Bayer
# for pointing that out. Oh, and for writing SQLAlchemy; that was pretty cool.
def visit_join(self, join, **kwargs):
kwargs['asfrom'] = True
return self.process(join.left, **kwargs) + ", " + self.process(join.right, **kwargs)
def visit_select(self, select, **kwargs):
if self.stack and 'from' in self.stack[-1]:
existingfroms = self.stack[-1]['from']
else:
existingfroms = None
froms = select._get_display_froms(existingfroms)
whereclause = self._get_join_whereclause(froms)
if whereclause is not None:
select = select.where(whereclause)
if select._limit is not None or select._offset is not None:
raise LimitClauseNotSupported(select._limit, select._offset)
kwargs['iswrapper'] = getattr(select, '_is_wrapper', False)
return super(SQLBase7Compiler, self).visit_select(select, **kwargs)
def _get_join_whereclause(self, froms):
clauses = []
def visit_join(join):
if join.isouter:
def visit_binary(binary):
if binary.operator == operators.eq:
if binary.left.table is join.right:
binary.left = _OuterJoinColumn(binary.left)
elif binary.right.table is join.right:
binary.right = _OuterJoinColumn(binary.right)
clauses.append(visitors.cloned_traverse(join.onclause, {}, {'binary':visit_binary}))
else:
clauses.append(join.onclause)
for j in join.left, join.right:
if isinstance(j, Join):
visit_join(j)
for f in froms:
if isinstance(f, Join):
visit_join(f)
if clauses:
return and_(*clauses)
return None
def visit_outer_join_column(self, vc):
return self.process(vc.column) + "(+)"
class _OuterJoinColumn(ClauseElement):
__visit_name__ = 'outer_join_column'
def __init__(self, column):
self.column = column
class SQLBase7Dialect(DefaultDialect):
2010-04-19 10:29:37 -05:00
name = 'sqlbase7'
statement_compiler = SQLBase7Compiler
max_identifier_length = 18
_type_map = {
2010-04-19 10:29:37 -05:00
'CHAR' : types.CHAR,
'DATE' : types.DATE,
'DECIMAL' : types.DECIMAL,
'FLOAT' : types.FLOAT,
'SMALLINT' : types.SMALLINT,
'TIME' : types.TIME,
'TIMESTMP' : types.TIMESTAMP,
'VARCHAR' : types.VARCHAR,
}
def create_connect_args(self, url):
connection_string = ';'.join((
"DRIVER={Centura SQLBase 3.5 32-bit Driver -NT & Win95}",
"SERVER=%s" % url.host,
"DATABASE=%s" % url.database,
"UID=%s" % url.username,
"PWD=%s" % url.password,
))
return [connection_string], {}
def get_default_schema_name(self, connection):
return 'SYSADM'
2010-04-27 16:26:39 -05:00
def do_execute(self, cursor, statement, parameters, context=None):
# For some (perhaps good?) reason, the SQLBase ODBC driver doesn't like
# parameters if they're of Unicode or Long type. I'd hoped at first that
# the "supports_unicode_binds" attribute would take care of the Unicode
# problem but it didn't seem to. And now the Long parameters seem to
# throw the same error, so...
parameters = list(parameters)
for i, parameter in enumerate(parameters):
if isinstance(parameter, unicode):
parameters[i] = str(parameter)
elif isinstance(parameter, long):
parameters[i] = int(parameter)
super(SQLBase7Dialect, self).do_execute(cursor, statement, tuple(parameters), context)