diff --git a/rattail_corepos/corepos/office/commands.py b/rattail_corepos/corepos/office/commands.py index 2d8ef76..51ff05f 100644 --- a/rattail_corepos/corepos/office/commands.py +++ b/rattail_corepos/corepos/office/commands.py @@ -183,6 +183,47 @@ def import_self( handler.run(kwargs, progress=progress) +@core_office_typer.command() +def install_triggers( + ctx: typer.Context, + status: Annotated[ + bool, + typer.Option('--status', + help="Show current status of DB, then exit.")] = False, + uninstall: Annotated[ + bool, + typer.Option('--uninstall', + help="Uninstall table and triggers, instead of install.")] = False, + table_name: Annotated[ + str, + typer.Option(help="Override name of \"changes\" table if needed.")] = 'datasync_changes', + dry_run: Annotated[ + bool, + typer.Option('--dry-run', + help="Do not (un)install anything, but show what would have been done.")] = False, +): + """ + Install MySQL DB triggers for use with Rattail DataSync + """ + from rattail_corepos.corepos.office.triggers import CoreTriggerHandler + + config = ctx.parent.rattail_config + app = config.get_app() + corepos = app.get_corepos_handler() + op_session = corepos.make_session_office_op() + triggers = CoreTriggerHandler(config) + + if status: + triggers.show_status(op_session, table_name) + elif uninstall: + triggers.uninstall_all(op_session, table_name, dry_run=dry_run) + else: + triggers.install_all(op_session, table_name, dry_run=dry_run) + + op_session.commit() + op_session.close() + + @core_office_typer.command() def patch_customer_gaps( ctx: typer.Context, diff --git a/rattail_corepos/corepos/office/triggers.py b/rattail_corepos/corepos/office/triggers.py new file mode 100644 index 0000000..4eb3cde --- /dev/null +++ b/rattail_corepos/corepos/office/triggers.py @@ -0,0 +1,339 @@ +# -*- coding: utf-8; -*- +################################################################################ +# +# Rattail -- Retail Software Framework +# Copyright © 2010-2024 Lance Edgar +# +# This file is part of Rattail. +# +# Rattail is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# Rattail. If not, see . +# +################################################################################ +""" +CORE Office - datasync triggers +""" + +import sqlalchemy as sa + +from rattail.app import GenericHandler +from rattail_corepos.datasync.corepos import make_changes_table + + +class CoreTriggerHandler(GenericHandler): + """ + Handler to install and show status of CORE DB triggers, for use + with Rattail DataSync. + """ + supported_triggers = [ + 'custdata', + 'meminfo', + 'departments', + 'subdepts', + 'vendors', + 'products', + 'vendorItems', + ] + + def show_status(self, op_session, table_name): + """ + Show trigger status for an ``office_op`` database. + """ + print() + print("database") + # nb. use repr() to hide password + print(f"url: {repr(op_session.bind.url)}") + exists = self.database_exists(op_session) + print(f"exists: {exists}") + if not exists: + return # nothing more to test + print() + + print("changes table") + print(f"name: {table_name}") + table = self.make_changes_table(table_name) + exists = self.changes_table_exists(op_session, table) + print(f"exists: {exists}") + if exists: + records = op_session.execute(table.select()) + print(f"records: {len(records.fetchall())}") + print() + + for trigger in self.supported_triggers: + print(f"triggers for {trigger}") + + create = f'record_{trigger}_create' + exists = self.trigger_exists(op_session, create) + print(f"{create:40s} exists: {exists}") + + update = f'record_{trigger}_update' + exists = self.trigger_exists(op_session, update) + print(f"{update:40s} exists: {exists}") + + delete = f'record_{trigger}_delete' + exists = self.trigger_exists(op_session, delete) + print(f"{delete:40s} exists: {exists}") + + print() + + def database_exists(self, op_session): + corepos = self.app.get_corepos_handler() + op_model = corepos.get_model_office_op() + try: + # just query a basic table, if things are normal then we're good + op_session.query(op_model.Department).count() + except sa.exc.ProgrammingError: + return False + return True + + def trigger_exists(self, op_session, trigger): + dbname = op_session.bind.url.database + sql = sa.text(f""" + SHOW TRIGGERS FROM `{dbname}` WHERE `Trigger` = :trigger + """) + result = op_session.execute(sql, {'trigger': trigger}) + if result.fetchone(): + return True + return False + + def changes_table_exists(self, op_session, table): + if isinstance(table, str): + table = self.make_changes_table(table) + try: + op_session.execute(table.select()) + except sa.exc.ProgrammingError: + return False + return True + + def make_changes_table(self, table_name): + metadata = sa.MetaData() + table = make_changes_table(table_name, metadata) + return table + + def install_all(self, op_session, table_name, dry_run=False): + self.install_changes_table(op_session, table_name, dry_run=dry_run) + self.install_triggers(op_session, table_name, dry_run=dry_run) + + def install_changes_table(self, op_session, table_name, dry_run=False): + print() + print("installing changes table...") + print(f"{table_name}: ", end='') + + table = self.make_changes_table(table_name) + if self.changes_table_exists(op_session, table): + print("already exists") + print() + return + + if not dry_run: + table.create(op_session.bind) + print("done") + print() + + def install_triggers(self, op_session, table_name, dry_run=False): + print("installing triggers...") + + for trigger in self.supported_triggers: + if not dry_run: + self.drop_triggers(op_session, trigger) + + meth = getattr(self, f'create_triggers_{trigger}') + meth(op_session, table_name) + + print("done") + print() + + def uninstall_all(self, op_session, table_name, dry_run=False): + self.uninstall_changes_table(op_session, table_name, dry_run=dry_run) + self.uninstall_triggers(op_session, dry_run=dry_run) + + def uninstall_changes_table(self, op_session, table_name, dry_run=False): + print() + print("uninstalling changes table...") + + table = self.make_changes_table(table_name) + if not self.changes_table_exists(op_session, table): + print("table does not exist") + print() + return + + if not dry_run: + # TODO: why does this drop() method just hang forever? + #table.drop(op_session.bind) + op_session.execute(sa.text(f"DROP TABLE {table_name}")) + print("done") + print() + + def uninstall_triggers(self, op_session, dry_run=False): + print("uninstalling triggers...") + + for trigger in self.supported_triggers: + if not dry_run: + self.drop_triggers(op_session, trigger) + + print("done") + print() + + def create_triggers_custdata(self, op_session, changes_table): + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_custdata_create + AFTER INSERT ON custdata + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Member', CONVERT(NEW.CardNo, CHAR), 0); + """)) + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_custdata_update + AFTER UPDATE ON custdata + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Member', CONVERT(NEW.CardNo, CHAR), 0); + """)) + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_custdata_delete + AFTER DELETE ON custdata + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Member', CONVERT(OLD.CardNo, CHAR), 1); + """)) + + def create_triggers_meminfo(self, op_session, changes_table): + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_meminfo_create + AFTER INSERT ON meminfo + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Member', CONVERT(NEW.card_no, CHAR), 0); + """)) + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_meminfo_update + AFTER UPDATE ON meminfo + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Member', CONVERT(NEW.card_no, CHAR), 0); + """)) + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_meminfo_delete + AFTER DELETE ON meminfo + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Member', CONVERT(OLD.card_no, CHAR)); + """)) + + def create_triggers_departments(self, op_session, changes_table): + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_departments_create + AFTER INSERT ON departments + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Department', CONVERT(NEW.dept_no, CHAR), 0); + """)) + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_departments_update + AFTER UPDATE ON departments + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Department', CONVERT(NEW.dept_no, CHAR), 0); + """)) + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_departments_delete + AFTER DELETE ON departments + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Department', CONVERT(OLD.dept_no, CHAR), 1); + """)) + + def create_triggers_subdepts(self, op_session, changes_table): + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_subdepts_create + AFTER INSERT ON subdepts + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Subdepartment', CONVERT(NEW.subdept_no, CHAR), 0); + """)) + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_subdepts_update + AFTER UPDATE ON subdepts + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Subdepartment', CONVERT(NEW.subdept_no, CHAR), 0); + """)) + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_subdepts_delete + AFTER DELETE ON subdepts + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Subdepartment', CONVERT(OLD.subdept_no, CHAR), 1); + """)) + + def create_triggers_vendors(self, op_session, changes_table): + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_vendors_create + AFTER INSERT ON vendors + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Vendor', CONVERT(NEW.vendorID, CHAR), 0); + """)) + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_vendors_update + AFTER UPDATE ON vendors + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Vendor', CONVERT(NEW.vendorID, CHAR), 0); + """)) + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_vendors_delete + AFTER DELETE ON vendors + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Vendor', CONVERT(OLD.vendorID, CHAR), 1); + """)) + + def create_triggers_products(self, op_session, changes_table): + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_products_create + AFTER INSERT ON products + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Product', NEW.upc, 0); + """)) + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_products_update + AFTER UPDATE ON products + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Product', NEW.upc, 0); + """)) + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_products_delete + AFTER DELETE ON products + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('Product', OLD.upc, 1); + """)) + + def create_triggers_vendorItems(self, op_session, changes_table): + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_vendorItems_create + AFTER INSERT ON vendorItems + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('VendorItem', CONCAT_WS('|', NEW.sku, CONVERT(NEW.vendorID, CHAR)), 0); + """)) + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_vendorItems_update + AFTER UPDATE ON vendorItems + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('VendorItem', CONCAT_WS('|', NEW.sku, CONVERT(NEW.vendorID, CHAR)), 0); + """)) + + op_session.execute(sa.text(f""" + CREATE TRIGGER record_vendorItems_delete + AFTER DELETE ON vendorItems + FOR EACH ROW INSERT INTO {changes_table} (object_type, object_key, deleted) VALUES ('VendorItem', CONCAT_WS('|', OLD.sku, CONVERT(OLD.vendorID, CHAR)), 1); + """)) + + def drop_triggers(self, op_session, trigger): + + op_session.execute(sa.text(f""" + DROP TRIGGER IF EXISTS record_{trigger}_create; + """)) + + op_session.execute(sa.text(f""" + DROP TRIGGER IF EXISTS record_{trigger}_update; + """)) + + op_session.execute(sa.text(f""" + DROP TRIGGER IF EXISTS record_{trigger}_delete; + """)) diff --git a/rattail_corepos/datasync/corepos.py b/rattail_corepos/datasync/corepos.py index 1668181..59dbd18 100644 --- a/rattail_corepos/datasync/corepos.py +++ b/rattail_corepos/datasync/corepos.py @@ -2,7 +2,7 @@ ################################################################################ # # Rattail -- Retail Software Framework -# Copyright © 2010-2023 Lance Edgar +# Copyright © 2010-2024 Lance Edgar # # This file is part of Rattail. # @@ -26,12 +26,19 @@ DataSync for CORE POS import sqlalchemy as sa -from corepos.db.office_op import Session as CoreSession, model as corepos - -from rattail.db import model from rattail.datasync import DataSyncWatcher, DataSyncImportConsumer +def make_changes_table(table_name, metadata): + return sa.Table( + table_name, metadata, + sa.Column('id', sa.Integer(), nullable=False, primary_key=True), + sa.Column('object_type', sa.String(length=255), nullable=False), + sa.Column('object_key', sa.String(length=255), nullable=False), + sa.Column('deleted', sa.Boolean(), nullable=False, default=False), + ) + + class CoreOfficeOpWatcher(DataSyncWatcher): """ DataSync watcher for the CORE ``office_op`` database. @@ -39,21 +46,19 @@ class CoreOfficeOpWatcher(DataSyncWatcher): prunes_changes = True def __init__(self, *args, **kwargs): - super(CoreOfficeOpWatcher, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) self.changes_table_name = kwargs.get('changes_table_name', 'datasync_changes') self.corepos_metadata = sa.MetaData() - self.corepos_changes = sa.Table( - self.changes_table_name, self.corepos_metadata, - sa.Column('id', sa.Integer(), nullable=False, primary_key=True), - sa.Column('object_type', sa.String(length=255), nullable=False), - sa.Column('object_key', sa.String(length=255), nullable=False), - sa.Column('deleted', sa.Boolean(), nullable=False, default=False)) + self.corepos_changes = make_changes_table(self.changes_table_name, + self.corepos_metadata) def get_changes(self, lastrun): - session = CoreSession() + model = self.model + corepos = self.app.get_corepos_handler() + session = corepos.make_session_office_op() result = session.execute(self.corepos_changes.select()) changes = result.fetchall() session.close() @@ -67,8 +72,9 @@ class CoreOfficeOpWatcher(DataSyncWatcher): for c in changes] def prune_changes(self, keys): + corepos = self.app.get_corepos_handler() + session = corepos.make_session_office_op() deleted = 0 - session = CoreSession() for key in keys: result = session.execute(self.corepos_changes.select()\ .where(self.corepos_changes.c.id == key)) @@ -90,13 +96,17 @@ class COREPOSProductWatcher(DataSyncWatcher): if not lastrun: return + model = self.model + corepos = self.app.get_corepos_handler() + op_model = corepos.get_model_office_op() + changes = [] - session = CoreSession() + session = corepos.make_session_office_op() lastrun = self.localize_lastrun(session, lastrun) # Department - departments = session.query(corepos.Department)\ - .filter(corepos.Department.modified >= lastrun)\ + departments = session.query(op_model.Department)\ + .filter(op_model.Department.modified >= lastrun)\ .all() if departments: changes.extend([ @@ -133,8 +143,8 @@ class COREPOSProductWatcher(DataSyncWatcher): # for vendor in vendors]) # Product - products = session.query(corepos.Product)\ - .filter(corepos.Product.modified >= lastrun)\ + products = session.query(op_model.Product)\ + .filter(op_model.Product.modified >= lastrun)\ .all() if products: changes.extend([ @@ -236,9 +246,11 @@ class FromRattailToCore(DataSyncImportConsumer): self.invoke_importer(session, change) def get_host_object(self, session, change): + model = self.model return session.get(getattr(model, change.payload_type), change.payload_key) def get_customers(self, session, change): + model = self.model clientele = self.app.get_clientele_handler() if change.payload_type == 'Customer': @@ -284,6 +296,7 @@ class FromRattailToCore(DataSyncImportConsumer): return [] def get_vendor(self, session, change): + model = self.model if change.payload_type == 'Vendor': return session.get(model.Vendor, change.payload_key) @@ -299,6 +312,7 @@ class FromRattailToCore(DataSyncImportConsumer): return email.vendor def get_product(self, session, change): + model = self.model if change.payload_type == 'Product': return session.get(model.Product, change.payload_key) diff --git a/rattail_corepos/importing/corepos/api.py b/rattail_corepos/importing/corepos/api.py index 27f3221..4c787ce 100644 --- a/rattail_corepos/importing/corepos/api.py +++ b/rattail_corepos/importing/corepos/api.py @@ -733,15 +733,7 @@ class ProductCostImporter(FromCOREPOSAPI, corepos_importing.model.ProductCostImp return # product has no default vendor items = vendor_items[upc] - for item in items: - if item['vendorID'] == vendor_id: - # found the default vendor item - j = items.index(item) - if j != 0: - # it was not first; make it so - items.pop(j) - items.insert(0, item) - break + self.sort_these_vendor_items(items, vendor_id) self.progress_loop(organize, list(vendor_items), message="Sorting items by default vendor") @@ -875,7 +867,28 @@ class ProductCostImporter(FromCOREPOSAPI, corepos_importing.model.ProductCostImp if hasattr(self, 'api_vendor_items'): return self.api_vendor_items.get(product['upc'], []) - raise NotImplementedError("must add real-time datasync support") + # nb. remaining logic is for real-time datasync. here we + # do not have a cache of vendor items so must fetch what + # we need from API. unfortunately we must (?) fetch *all* + # vendor items and then filter locally + items = [item + for item in self.api.get_vendor_items() + if item['upc'] == product['upc']] + + vendor_id = product['default_vendor_id'] + self.sort_these_vendor_items(items, vendor_id) + return items + + def sort_these_vendor_items(self, items, default_vendor_id): + for item in items: + if item['vendorID'] == default_vendor_id: + # found the default vendor item + i = items.index(item) + if i != 0: + # it was not first; make it so + items.pop(i) + items.insert(0, item) + break class MembershipTypeImporter(FromCOREPOSAPI, importing.model.MembershipTypeImporter):