rattail-corepos/rattail_corepos/datasync/corepos.py

304 lines
11 KiB
Python
Raw Normal View History

2018-06-01 15:14:49 -05:00
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2021 Lance Edgar
2018-06-01 15:14:49 -05:00
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
DataSync for CORE POS
"""
import sqlalchemy as sa
from corepos.db.office_op import Session as CoreSession, model as corepos
2018-06-01 15:14:49 -05:00
from rattail.db import model
from rattail.datasync import DataSyncWatcher, DataSyncImportConsumer
2018-06-01 15:14:49 -05:00
class CoreOfficeOpWatcher(DataSyncWatcher):
"""
DataSync watcher for the CORE ``office_op`` database.
"""
prunes_changes = True
def __init__(self, *args, **kwargs):
super(CoreOfficeOpWatcher, self).__init__(*args, **kwargs)
self.changes_table_name = kwargs.get('changes_table_name',
'datasync_changes')
self.corepos_metadata = sa.MetaData()
self.corepos_changes = sa.Table(
self.changes_table_name, self.corepos_metadata,
sa.Column('id', sa.Integer(), nullable=False, primary_key=True),
sa.Column('object_type', sa.String(length=255), nullable=False),
sa.Column('object_key', sa.String(length=255), nullable=False),
sa.Column('deleted', sa.Boolean(), nullable=False, default=False))
def get_changes(self, lastrun):
session = CoreSession()
result = session.execute(sa.sql.select([self.corepos_changes]))
changes = result.fetchall()
session.close()
if changes:
return [
(c.id,
model.DataSyncChange(
payload_type=c.object_type,
payload_key=c.object_key,
deletion=c.deleted))
for c in changes]
def prune_changes(self, keys):
deleted = 0
session = CoreSession()
for key in keys:
result = session.execute(self.corepos_changes.select()\
.where(self.corepos_changes.c.id == key))
if result.fetchall():
session.execute(self.corepos_changes.delete()\
.where(self.corepos_changes.c.id == key))
deleted += 1
session.commit()
session.close()
return deleted
2018-06-01 15:14:49 -05:00
class COREPOSProductWatcher(DataSyncWatcher):
"""
DataSync watcher for the CORE POS database.
"""
def get_changes(self, lastrun):
if not lastrun:
return
changes = []
session = CoreSession()
lastrun = self.localize_lastrun(session, lastrun)
# Department
departments = session.query(corepos.Department)\
.filter(corepos.Department.modified >= lastrun)\
.all()
if departments:
changes.extend([
(None,
model.DataSyncChange(
payload_type='Department',
payload_key=str(dept.number)))
2018-06-01 15:14:49 -05:00
for dept in departments])
# TODO: subdepartment table doesn't have a modified flag?
# # Subdepartment
# subdepartments = session.query(corepos.Subdepartment)\
# .filter(corepos.Subdepartment.modified >= lastrun)\
# .all()
# if subdepartments:
# changes.extend([
# (None,
# model.DataSyncChange(
# payload_type='Subdepartment',
# payload_key=six.text_type(subdept.subdept_no)))
# for subdept in subdepartments])
# TODO: vendor table doesn't have a modified flag?
# # Vendor
# vendors = session.query(corepos.Vendor)\
# .filter(corepos.Vendor.modified >= lastrun)\
# .all()
# if vendors:
# changes.extend([
# (None,
# model.DataSyncChange(
# payload_type='Vendor',
# payload_key=six.text_type(vendor.vendorID)))
# for vendor in vendors])
# Product
products = session.query(corepos.Product)\
.filter(corepos.Product.modified >= lastrun)\
.all()
if products:
changes.extend([
(None,
model.DataSyncChange(
payload_type='Product',
payload_key=product.upc))
for product in products
if product.upc])
session.close()
return changes
class FromRattailToCore(DataSyncImportConsumer):
"""
Rattail -> CORE POS datasync consumer
"""
handler_spec = 'rattail_corepos.corepos.importing.rattail:FromRattailToCore'
def process_changes(self, session, changes):
"""
Process all the given changes, coming from Rattail.
"""
# TODO: this probably doesn't accomplish anything here?
if self.runas_username:
session.set_continuum_user(self.runas_username)
# update all importers with current session
for importer in self.importers.values():
importer.host_session = session
# also establish the API client for each!
importer.establish_api()
# sync all Customer changes
types = [
'Customer',
'Person',
'CustomerPerson',
'CustomerMailingAddress',
'PersonPhoneNumber',
'PersonEmailAddress',
'Member',
]
for change in [c for c in changes if c.payload_type in types]:
if change.payload_type == 'Customer' and change.deletion:
# # just do default logic for this one
# self.invoke_importer(session, change)
# TODO: we have no way to delete a CORE customer via API, right?
pass
else: # we consider this an "add/update"
customers = self.get_customers(session, change)
for customer in customers:
self.process_change(session, self.importers['Member'],
host_object=customer)
# sync all Vendor changes
types = [
'Vendor',
'VendorPhoneNumber',
'VendorEmailAddress',
]
for change in [c for c in changes if c.payload_type in types]:
if change.payload_type == 'Vendor' and change.deletion:
# # just do default logic for this one
# self.invoke_importer(session, change)
# TODO: we have no way to delete a CORE vendor via API, right?
pass
else: # we consider this an "add/update"
vendor = self.get_vendor(session, change)
if vendor:
self.process_change(session, self.importers['Vendor'],
host_object=vendor)
# sync all Product changes
types = [
'Product',
'ProductPrice',
]
for change in [c for c in changes if c.payload_type in types]:
if change.payload_type == 'Product' and change.deletion:
# # just do default logic for this one
# self.invoke_importer(session, change)
# TODO: we have no way to delete a CORE product via API, right?
pass
else: # we consider this an "add/update"
product = self.get_product(session, change)
if product:
self.process_change(session, self.importers['Product'],
host_object=product)
# process all remaining supported models with typical logic
types = [
'Department',
'Subdepartment',
]
for change in [c for c in changes if c.payload_type in types]:
self.invoke_importer(session, change)
def get_host_object(self, session, change):
return session.query(getattr(model, change.payload_type))\
.get(change.payload_key)
def get_customers(self, session, change):
if change.payload_type == 'Customer':
customer = session.query(model.Customer).get(change.payload_key)
if customer:
return [customer]
if change.payload_type == 'CustomerPerson':
cp = session.query(model.CustomerPerson).get(change.payload_key)
if cp:
return [cp.customer]
if change.payload_type == 'Person':
person = session.query(model.Person).get(change.payload_key)
if person:
return person.customers
if change.payload_type == 'CustomerMailingAddress':
address = session.query(model.CustomerMailingAddress).get(change.payload_key)
if address:
return [address.customer]
if change.payload_type == 'PersonPhoneNumber':
phone = session.query(model.PersonPhoneNumber).get(change.payload_key)
if phone:
return phone.person.customers
if change.payload_type == 'PersonEmailAddress':
email = session.query(model.PersonEmailAddress).get(change.payload_key)
if email:
return email.person.customers
if change.payload_type == 'Member':
member = session.query(model.Member).get(change.payload_key)
if member:
return [member.customer]
return []
def get_vendor(self, session, change):
if change.payload_type == 'Vendor':
return session.query(model.Vendor).get(change.payload_key)
if change.payload_type == 'VendorPhoneNumber':
phone = session.query(model.VendorPhoneNumber).get(change.payload_key)
if phone:
return phone.vendor
if change.payload_type == 'VendorEmailAddress':
email = session.query(model.VendorEmailAddress).get(change.payload_key)
if email:
return email.vendor
def get_product(self, session, change):
if change.payload_type == 'Product':
return session.query(model.Product).get(change.payload_key)
if change.payload_type == 'ProductPrice':
price = session.query(model.ProductPrice).get(change.payload_key)
if price:
return price.product