fix: add command to install mysql triggers for CORE office_op DB

for use with datasync.  this also adds datasync support for
ProductCost preference
This commit is contained in:
Lance Edgar 2024-07-03 18:25:18 -05:00
parent 2f22be6e7e
commit dca2c1bfe2
4 changed files with 435 additions and 28 deletions

View file

@ -183,6 +183,47 @@ def import_self(
handler.run(kwargs, progress=progress) handler.run(kwargs, progress=progress)
@core_office_typer.command()
def install_triggers(
ctx: typer.Context,
status: Annotated[
bool,
typer.Option('--status',
help="Show current status of DB, then exit.")] = False,
uninstall: Annotated[
bool,
typer.Option('--uninstall',
help="Uninstall table and triggers, instead of install.")] = False,
table_name: Annotated[
str,
typer.Option(help="Override name of \"changes\" table if needed.")] = 'datasync_changes',
dry_run: Annotated[
bool,
typer.Option('--dry-run',
help="Do not (un)install anything, but show what would have been done.")] = False,
):
"""
Install MySQL DB triggers for use with Rattail DataSync
"""
from rattail_corepos.corepos.office.triggers import CoreTriggerHandler
config = ctx.parent.rattail_config
app = config.get_app()
corepos = app.get_corepos_handler()
op_session = corepos.make_session_office_op()
triggers = CoreTriggerHandler(config)
if status:
triggers.show_status(op_session, table_name)
elif uninstall:
triggers.uninstall_all(op_session, table_name, dry_run=dry_run)
else:
triggers.install_all(op_session, table_name, dry_run=dry_run)
op_session.commit()
op_session.close()
@core_office_typer.command() @core_office_typer.command()
def patch_customer_gaps( def patch_customer_gaps(
ctx: typer.Context, ctx: typer.Context,

View file

@ -0,0 +1,339 @@
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2024 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
CORE Office - datasync triggers
"""
import sqlalchemy as sa
from rattail.app import GenericHandler
from rattail_corepos.datasync.corepos import make_changes_table
class CoreTriggerHandler(GenericHandler):
"""
Handler to install and show status of CORE DB triggers, for use
with Rattail DataSync.
"""
supported_triggers = [
'custdata',
'meminfo',
'departments',
'subdepts',
'vendors',
'products',
'vendorItems',
]
def show_status(self, op_session, table_name):
"""
Show trigger status for an ``office_op`` database.
"""
print()
print("database")
# nb. use repr() to hide password
print(f"url: {repr(op_session.bind.url)}")
exists = self.database_exists(op_session)
print(f"exists: {exists}")
if not exists:
return # nothing more to test
print()
print("changes table")
print(f"name: {table_name}")
table = self.make_changes_table(table_name)
exists = self.changes_table_exists(op_session, table)
print(f"exists: {exists}")
if exists:
records = op_session.execute(table.select())
print(f"records: {len(records.fetchall())}")
print()
for trigger in self.supported_triggers:
print(f"triggers for {trigger}")
create = f'record_{trigger}_create'
exists = self.trigger_exists(op_session, create)
print(f"{create:40s} exists: {exists}")
update = f'record_{trigger}_update'
exists = self.trigger_exists(op_session, update)
print(f"{update:40s} exists: {exists}")
delete = f'record_{trigger}_delete'
exists = self.trigger_exists(op_session, delete)
print(f"{delete:40s} exists: {exists}")
print()
def database_exists(self, op_session):
corepos = self.app.get_corepos_handler()
op_model = corepos.get_model_office_op()
try:
# just query a basic table, if things are normal then we're good
op_session.query(op_model.Department).count()
except sa.exc.ProgrammingError:
return False
return True
def trigger_exists(self, op_session, trigger):
dbname = op_session.bind.url.database
sql = sa.text(f"""
SHOW TRIGGERS FROM `{dbname}` WHERE `Trigger` = :trigger
""")
result = op_session.execute(sql, {'trigger': trigger})
if result.fetchone():
return True
return False
def changes_table_exists(self, op_session, table):
if isinstance(table, str):
table = self.make_changes_table(table)
try:
op_session.execute(table.select())
except sa.exc.ProgrammingError:
return False
return True
def make_changes_table(self, table_name):
metadata = sa.MetaData()
table = make_changes_table(table_name, metadata)
return table
def install_all(self, op_session, table_name, dry_run=False):
self.install_changes_table(op_session, table_name, dry_run=dry_run)
self.install_triggers(op_session, table_name, dry_run=dry_run)
def install_changes_table(self, op_session, table_name, dry_run=False):
print()
print("installing changes table...")
print(f"{table_name}: ", end='')
table = self.make_changes_table(table_name)
if self.changes_table_exists(op_session, table):
print("already exists")
print()
return
if not dry_run:
table.create(op_session.bind)
print("done")
print()
def install_triggers(self, op_session, table_name, dry_run=False):
print("installing triggers...")
for trigger in self.supported_triggers:
if not dry_run:
self.drop_triggers(op_session, trigger)
meth = getattr(self, f'create_triggers_{trigger}')
meth(op_session, table_name)
print("done")
print()
def uninstall_all(self, op_session, table_name, dry_run=False):
self.uninstall_changes_table(op_session, table_name, dry_run=dry_run)
self.uninstall_triggers(op_session, dry_run=dry_run)
def uninstall_changes_table(self, op_session, table_name, dry_run=False):
print()
print("uninstalling changes table...")
table = self.make_changes_table(table_name)
if not self.changes_table_exists(op_session, table):
print("table does not exist")
print()
return
if not dry_run:
# TODO: why does this drop() method just hang forever?
#table.drop(op_session.bind)
op_session.execute(sa.text(f"DROP TABLE {table_name}"))
print("done")
print()
def uninstall_triggers(self, op_session, dry_run=False):
print("uninstalling triggers...")
for trigger in self.supported_triggers:
if not dry_run:
self.drop_triggers(op_session, trigger)
print("done")
print()
def create_triggers_custdata(self, op_session, changes_table):
op_session.execute(sa.text(f"""
CREATE TRIGGER record_custdata_create
AFTER INSERT ON custdata
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Member', CONVERT(NEW.CardNo, CHAR), 0);
"""))
op_session.execute(sa.text(f"""
CREATE TRIGGER record_custdata_update
AFTER UPDATE ON custdata
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Member', CONVERT(NEW.CardNo, CHAR), 0);
"""))
op_session.execute(sa.text(f"""
CREATE TRIGGER record_custdata_delete
AFTER DELETE ON custdata
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Member', CONVERT(OLD.CardNo, CHAR), 1);
"""))
def create_triggers_meminfo(self, op_session, changes_table):
op_session.execute(sa.text(f"""
CREATE TRIGGER record_meminfo_create
AFTER INSERT ON meminfo
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Member', CONVERT(NEW.card_no, CHAR), 0);
"""))
op_session.execute(sa.text(f"""
CREATE TRIGGER record_meminfo_update
AFTER UPDATE ON meminfo
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Member', CONVERT(NEW.card_no, CHAR), 0);
"""))
op_session.execute(sa.text(f"""
CREATE TRIGGER record_meminfo_delete
AFTER DELETE ON meminfo
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Member', CONVERT(OLD.card_no, CHAR));
"""))
def create_triggers_departments(self, op_session, changes_table):
op_session.execute(sa.text(f"""
CREATE TRIGGER record_departments_create
AFTER INSERT ON departments
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Department', CONVERT(NEW.dept_no, CHAR), 0);
"""))
op_session.execute(sa.text(f"""
CREATE TRIGGER record_departments_update
AFTER UPDATE ON departments
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Department', CONVERT(NEW.dept_no, CHAR), 0);
"""))
op_session.execute(sa.text(f"""
CREATE TRIGGER record_departments_delete
AFTER DELETE ON departments
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Department', CONVERT(OLD.dept_no, CHAR), 1);
"""))
def create_triggers_subdepts(self, op_session, changes_table):
op_session.execute(sa.text(f"""
CREATE TRIGGER record_subdepts_create
AFTER INSERT ON subdepts
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Subdepartment', CONVERT(NEW.subdept_no, CHAR), 0);
"""))
op_session.execute(sa.text(f"""
CREATE TRIGGER record_subdepts_update
AFTER UPDATE ON subdepts
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Subdepartment', CONVERT(NEW.subdept_no, CHAR), 0);
"""))
op_session.execute(sa.text(f"""
CREATE TRIGGER record_subdepts_delete
AFTER DELETE ON subdepts
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Subdepartment', CONVERT(OLD.subdept_no, CHAR), 1);
"""))
def create_triggers_vendors(self, op_session, changes_table):
op_session.execute(sa.text(f"""
CREATE TRIGGER record_vendors_create
AFTER INSERT ON vendors
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Vendor', CONVERT(NEW.vendorID, CHAR), 0);
"""))
op_session.execute(sa.text(f"""
CREATE TRIGGER record_vendors_update
AFTER UPDATE ON vendors
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Vendor', CONVERT(NEW.vendorID, CHAR), 0);
"""))
op_session.execute(sa.text(f"""
CREATE TRIGGER record_vendors_delete
AFTER DELETE ON vendors
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Vendor', CONVERT(OLD.vendorID, CHAR), 1);
"""))
def create_triggers_products(self, op_session, changes_table):
op_session.execute(sa.text(f"""
CREATE TRIGGER record_products_create
AFTER INSERT ON products
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Product', NEW.upc, 0);
"""))
op_session.execute(sa.text(f"""
CREATE TRIGGER record_products_update
AFTER UPDATE ON products
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Product', NEW.upc, 0);
"""))
op_session.execute(sa.text(f"""
CREATE TRIGGER record_products_delete
AFTER DELETE ON products
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Product', OLD.upc, 1);
"""))
def create_triggers_vendorItems(self, op_session, changes_table):
op_session.execute(sa.text(f"""
CREATE TRIGGER record_vendorItems_create
AFTER INSERT ON vendorItems
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('VendorItem', CONCAT_WS('|', NEW.sku, CONVERT(NEW.vendorID, CHAR)), 0);
"""))
op_session.execute(sa.text(f"""
CREATE TRIGGER record_vendorItems_update
AFTER UPDATE ON vendorItems
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('VendorItem', CONCAT_WS('|', NEW.sku, CONVERT(NEW.vendorID, CHAR)), 0);
"""))
op_session.execute(sa.text(f"""
CREATE TRIGGER record_vendorItems_delete
AFTER DELETE ON vendorItems
FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('VendorItem', CONCAT_WS('|', OLD.sku, CONVERT(OLD.vendorID, CHAR)), 1);
"""))
def drop_triggers(self, op_session, trigger):
op_session.execute(sa.text(f"""
DROP TRIGGER IF EXISTS record_{trigger}_create;
"""))
op_session.execute(sa.text(f"""
DROP TRIGGER IF EXISTS record_{trigger}_update;
"""))
op_session.execute(sa.text(f"""
DROP TRIGGER IF EXISTS record_{trigger}_delete;
"""))

View file

@ -2,7 +2,7 @@
################################################################################ ################################################################################
# #
# Rattail -- Retail Software Framework # Rattail -- Retail Software Framework
# Copyright © 2010-2023 Lance Edgar # Copyright © 2010-2024 Lance Edgar
# #
# This file is part of Rattail. # This file is part of Rattail.
# #
@ -26,12 +26,19 @@ DataSync for CORE POS
import sqlalchemy as sa import sqlalchemy as sa
from corepos.db.office_op import Session as CoreSession, model as corepos
from rattail.db import model
from rattail.datasync import DataSyncWatcher, DataSyncImportConsumer from rattail.datasync import DataSyncWatcher, DataSyncImportConsumer
def make_changes_table(table_name, metadata):
return sa.Table(
table_name, metadata,
sa.Column('id', sa.Integer(), nullable=False, primary_key=True),
sa.Column('object_type', sa.String(length=255), nullable=False),
sa.Column('object_key', sa.String(length=255), nullable=False),
sa.Column('deleted', sa.Boolean(), nullable=False, default=False),
)
class CoreOfficeOpWatcher(DataSyncWatcher): class CoreOfficeOpWatcher(DataSyncWatcher):
""" """
DataSync watcher for the CORE ``office_op`` database. DataSync watcher for the CORE ``office_op`` database.
@ -39,21 +46,19 @@ class CoreOfficeOpWatcher(DataSyncWatcher):
prunes_changes = True prunes_changes = True
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(CoreOfficeOpWatcher, self).__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.changes_table_name = kwargs.get('changes_table_name', self.changes_table_name = kwargs.get('changes_table_name',
'datasync_changes') 'datasync_changes')
self.corepos_metadata = sa.MetaData() self.corepos_metadata = sa.MetaData()
self.corepos_changes = sa.Table( self.corepos_changes = make_changes_table(self.changes_table_name,
self.changes_table_name, self.corepos_metadata, self.corepos_metadata)
sa.Column('id', sa.Integer(), nullable=False, primary_key=True),
sa.Column('object_type', sa.String(length=255), nullable=False),
sa.Column('object_key', sa.String(length=255), nullable=False),
sa.Column('deleted', sa.Boolean(), nullable=False, default=False))
def get_changes(self, lastrun): def get_changes(self, lastrun):
session = CoreSession() model = self.model
corepos = self.app.get_corepos_handler()
session = corepos.make_session_office_op()
result = session.execute(self.corepos_changes.select()) result = session.execute(self.corepos_changes.select())
changes = result.fetchall() changes = result.fetchall()
session.close() session.close()
@ -67,8 +72,9 @@ class CoreOfficeOpWatcher(DataSyncWatcher):
for c in changes] for c in changes]
def prune_changes(self, keys): def prune_changes(self, keys):
corepos = self.app.get_corepos_handler()
session = corepos.make_session_office_op()
deleted = 0 deleted = 0
session = CoreSession()
for key in keys: for key in keys:
result = session.execute(self.corepos_changes.select()\ result = session.execute(self.corepos_changes.select()\
.where(self.corepos_changes.c.id == key)) .where(self.corepos_changes.c.id == key))
@ -90,13 +96,17 @@ class COREPOSProductWatcher(DataSyncWatcher):
if not lastrun: if not lastrun:
return return
model = self.model
corepos = self.app.get_corepos_handler()
op_model = corepos.get_model_office_op()
changes = [] changes = []
session = CoreSession() session = corepos.make_session_office_op()
lastrun = self.localize_lastrun(session, lastrun) lastrun = self.localize_lastrun(session, lastrun)
# Department # Department
departments = session.query(corepos.Department)\ departments = session.query(op_model.Department)\
.filter(corepos.Department.modified >= lastrun)\ .filter(op_model.Department.modified >= lastrun)\
.all() .all()
if departments: if departments:
changes.extend([ changes.extend([
@ -133,8 +143,8 @@ class COREPOSProductWatcher(DataSyncWatcher):
# for vendor in vendors]) # for vendor in vendors])
# Product # Product
products = session.query(corepos.Product)\ products = session.query(op_model.Product)\
.filter(corepos.Product.modified >= lastrun)\ .filter(op_model.Product.modified >= lastrun)\
.all() .all()
if products: if products:
changes.extend([ changes.extend([
@ -236,9 +246,11 @@ class FromRattailToCore(DataSyncImportConsumer):
self.invoke_importer(session, change) self.invoke_importer(session, change)
def get_host_object(self, session, change): def get_host_object(self, session, change):
model = self.model
return session.get(getattr(model, change.payload_type), change.payload_key) return session.get(getattr(model, change.payload_type), change.payload_key)
def get_customers(self, session, change): def get_customers(self, session, change):
model = self.model
clientele = self.app.get_clientele_handler() clientele = self.app.get_clientele_handler()
if change.payload_type == 'Customer': if change.payload_type == 'Customer':
@ -284,6 +296,7 @@ class FromRattailToCore(DataSyncImportConsumer):
return [] return []
def get_vendor(self, session, change): def get_vendor(self, session, change):
model = self.model
if change.payload_type == 'Vendor': if change.payload_type == 'Vendor':
return session.get(model.Vendor, change.payload_key) return session.get(model.Vendor, change.payload_key)
@ -299,6 +312,7 @@ class FromRattailToCore(DataSyncImportConsumer):
return email.vendor return email.vendor
def get_product(self, session, change): def get_product(self, session, change):
model = self.model
if change.payload_type == 'Product': if change.payload_type == 'Product':
return session.get(model.Product, change.payload_key) return session.get(model.Product, change.payload_key)

View file

@ -733,15 +733,7 @@ class ProductCostImporter(FromCOREPOSAPI, corepos_importing.model.ProductCostImp
return # product has no default vendor return # product has no default vendor
items = vendor_items[upc] items = vendor_items[upc]
for item in items: self.sort_these_vendor_items(items, vendor_id)
if item['vendorID'] == vendor_id:
# found the default vendor item
j = items.index(item)
if j != 0:
# it was not first; make it so
items.pop(j)
items.insert(0, item)
break
self.progress_loop(organize, list(vendor_items), self.progress_loop(organize, list(vendor_items),
message="Sorting items by default vendor") message="Sorting items by default vendor")
@ -875,7 +867,28 @@ class ProductCostImporter(FromCOREPOSAPI, corepos_importing.model.ProductCostImp
if hasattr(self, 'api_vendor_items'): if hasattr(self, 'api_vendor_items'):
return self.api_vendor_items.get(product['upc'], []) return self.api_vendor_items.get(product['upc'], [])
raise NotImplementedError("must add real-time datasync support") # nb. remaining logic is for real-time datasync. here we
# do not have a cache of vendor items so must fetch what
# we need from API. unfortunately we must (?) fetch *all*
# vendor items and then filter locally
items = [item
for item in self.api.get_vendor_items()
if item['upc'] == product['upc']]
vendor_id = product['default_vendor_id']
self.sort_these_vendor_items(items, vendor_id)
return items
def sort_these_vendor_items(self, items, default_vendor_id):
for item in items:
if item['vendorID'] == default_vendor_id:
# found the default vendor item
i = items.index(item)
if i != 0:
# it was not first; make it so
items.pop(i)
items.insert(0, item)
break
class MembershipTypeImporter(FromCOREPOSAPI, importing.model.MembershipTypeImporter): class MembershipTypeImporter(FromCOREPOSAPI, importing.model.MembershipTypeImporter):