sqlbase7-sa/sqlbase7_sa/base.py

147 lines
5.1 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
################################################################################
#
# SQLBase7_SA -- SQLAlchemy driver/dialect for Centura SQLBase v7
# Copyright © 2010 Lance Edgar
#
# This file is part of SQLBase7_SA.
#
# SQLBase7_SA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# SQLBase7_SA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with SQLBase7_SA. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
from sqlalchemy.engine.default import DefaultDialect
from sqlalchemy import types, and_
from sqlalchemy.sql.compiler import SQLCompiler
from sqlalchemy.sql.expression import Join
class SQLBase7Compiler(SQLCompiler):
# Most of the code below was copied from the Oracle dialect. Thanks to Michael Bayer
# for pointing that out. Oh, and for writing SQLAlchemy; that was pretty cool.
def visit_join(self, join, **kwargs):
kwargs['asfrom'] = True
return self.process(join.left, **kwargs) + ", " + self.process(join.right, **kwargs)
def visit_select(self, select, **kwargs):
froms = select._get_display_froms()
whereclause = self._get_join_whereclause(froms)
if whereclause is not None:
select = select.where(whereclause)
kwargs['iswrapper'] = getattr(select, '_is_wrapper', False)
return SQLCompiler.visit_select(self, select, **kwargs)
def _get_join_whereclause(self, froms):
clauses = []
def visit_join(join):
clauses.append(join.onclause)
for j in join.left, join.right:
if isinstance(j, Join):
visit_join(j)
for f in froms:
if isinstance(f, Join):
visit_join(f)
return and_(*clauses)
def visit_ilike_op(self, binary, **kw):
escape = binary.modifiers.get("escape", None)
return '@lower(%s) LIKE @lower(%s)' % (
self.process(binary.left, **kw),
self.process(binary.right, **kw)) \
+ (escape and ' ESCAPE \'%s\'' % escape or '')
class SQLBase7Dialect(DefaultDialect):
name = 'sqlbase7'
max_identifier_length = 18
statement_compiler = SQLBase7Compiler
type_map = {
'CHAR' : types.CHAR,
'DATE' : types.DATE,
'DECIMAL' : types.DECIMAL,
'FLOAT' : types.FLOAT,
'SMALLINT' : types.SMALLINT,
'TIME' : types.TIME,
'TIMESTMP' : types.TIMESTAMP,
'VARCHAR' : types.VARCHAR,
}
def _check_unicode_returns(self, connection):
return False
def get_table_names(self, connection, schema=None, **kw):
if schema is None:
schema = ''
else:
schema = '%s.' % schema
cursor = connection.connection.cursor()
table_names = [row.NAME for row in cursor.execute(
"SELECT NAME FROM %sSYSTABLES WHERE REMARKS IS NOT NULL" % schema
)]
cursor.close()
return table_names
def get_columns(self, connection, table_name, schema=None, **kw):
if schema is None:
schema = ''
else:
schema = '%s.' % schema
cursor = connection.connection.cursor()
columns = []
for row in cursor.execute("SELECT NAME,COLTYPE,NULLS FROM %sSYSCOLUMNS WHERE TBNAME = '%s'" % (schema, table_name)):
columns.append({
'name' : row.NAME,
'type' : self.type_map[row.COLTYPE],
'nullable' : row.NULLS == 'Y',
'default' : None,
'autoincrement' : False,
})
cursor.close()
return columns
def get_primary_keys(self, connection, table_name, schema=None, **kw):
if schema is None:
schema = ''
else:
schema = '%s.' % schema
cursor = connection.connection.cursor()
primary_keys = [row.COLNAME for row in cursor.execute(
"SELECT COLNAME FROM %sSYSPKCONSTRAINTS WHERE NAME = '%s' ORDER BY PKCOLSEQNUM" % (schema, table_name)
)]
cursor.close()
return primary_keys
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
return []
def get_indexes(self, connection, table_name, schema=None, **kw):
return []