Move logic for CORE importing to "more precise" module path

should distinguish "office vs. lane"
This commit is contained in:
Lance Edgar 2023-05-08 14:45:55 -05:00
parent 519ed0a594
commit ef823260ab
24 changed files with 2461 additions and 2065 deletions

View file

@ -2,7 +2,7 @@
################################################################################ ################################################################################
# #
# Rattail -- Retail Software Framework # Rattail -- Retail Software Framework
# Copyright © 2010-2020 Lance Edgar # Copyright © 2010-2023 Lance Edgar
# #
# This file is part of Rattail. # This file is part of Rattail.
# #
@ -24,4 +24,10 @@
Importing data into CORE-POS Importing data into CORE-POS
""" """
from . import model import warnings
warnings.warn("rattail_corepos.corepos.importing is deprecated; "
"please use rattail_corepos.corepos.office.importing instead",
DeprecationWarning, stacklevel=2)
from rattail_corepos.corepos.office.importing import *

View file

@ -2,7 +2,7 @@
################################################################################ ################################################################################
# #
# Rattail -- Retail Software Framework # Rattail -- Retail Software Framework
# Copyright © 2010-2020 Lance Edgar # Copyright © 2010-2023 Lance Edgar
# #
# This file is part of Rattail. # This file is part of Rattail.
# #
@ -24,4 +24,10 @@
Importing data into CORE-POS (direct DB) Importing data into CORE-POS (direct DB)
""" """
from . import model import warnings
warnings.warn("rattail_corepos.corepos.importing is deprecated; "
"please use rattail_corepos.corepos.office.importing instead",
DeprecationWarning, stacklevel=2)
from rattail_corepos.corepos.office.importing.db import *

View file

@ -24,137 +24,10 @@
CORE-POS -> CORE-POS data import CORE-POS -> CORE-POS data import
""" """
from collections import OrderedDict import warnings
from corepos.db.office_op import Session as CoreSession warnings.warn("rattail_corepos.corepos.importing is deprecated; "
"please use rattail_corepos.corepos.office.importing instead",
DeprecationWarning, stacklevel=2)
from rattail.importing.handlers import FromSQLAlchemyHandler, ToSQLAlchemyHandler from rattail_corepos.corepos.office.importing.db.corepos import *
from rattail.importing.sqlalchemy import FromSQLAlchemySameToSame
from rattail_corepos.corepos.importing import db as corepos_importing
class FromCoreHandler(FromSQLAlchemyHandler):
"""
Base class for import handlers which use a CORE database as the host / source.
"""
host_title = "CORE"
host_key = 'corepos_db_office_op'
def make_host_session(self):
return CoreSession()
class ToCoreHandler(ToSQLAlchemyHandler):
"""
Base class for import handlers which target a CORE database on the local side.
"""
local_title = "CORE"
local_key = 'corepos_db_office_op'
def make_session(self):
return CoreSession()
class FromCoreToCoreBase(object):
"""
Common base class for Core -> Core data import/export handlers.
"""
def get_importers(self):
importers = OrderedDict()
importers['Department'] = DepartmentImporter
importers['Subdepartment'] = SubdepartmentImporter
importers['Vendor'] = VendorImporter
importers['VendorContact'] = VendorContactImporter
importers['Product'] = ProductImporter
importers['ProductFlag'] = ProductFlagImporter
importers['VendorItem'] = VendorItemImporter
importers['Employee'] = EmployeeImporter
importers['CustData'] = CustDataImporter
importers['MemberType'] = MemberTypeImporter
importers['MemberInfo'] = MemberInfoImporter
importers['HouseCoupon'] = HouseCouponImporter
return importers
class FromCoreToCoreImport(FromCoreToCoreBase, FromCoreHandler, ToCoreHandler):
"""
Handler for CORE (other) -> CORE (local) data import.
.. attribute:: direction
Value is ``'import'`` - see also
:attr:`rattail.importing.handlers.ImportHandler.direction`.
"""
dbkey = 'host'
local_title = "CORE (default)"
@property
def host_title(self):
return "CORE ({})".format(self.dbkey)
def make_host_session(self):
return CoreSession(bind=self.config.corepos_engines[self.dbkey])
class FromCoreToCoreExport(FromCoreToCoreBase, FromCoreHandler, ToCoreHandler):
"""
Handler for CORE (local) -> CORE (other) data export.
.. attribute:: direction
Value is ``'export'`` - see also
:attr:`rattail.importing.handlers.ImportHandler.direction`.
"""
direction = 'export'
host_title = "CORE (default)"
@property
def local_title(self):
return "CORE ({})".format(self.dbkey)
def make_session(self):
return CoreSession(bind=self.config.corepos_engines[self.dbkey])
class FromCore(FromSQLAlchemySameToSame):
"""
Base class for CORE -> CORE data importers.
"""
class DepartmentImporter(FromCore, corepos_importing.model.DepartmentImporter):
pass
class SubdepartmentImporter(FromCore, corepos_importing.model.SubdepartmentImporter):
pass
class VendorImporter(FromCore, corepos_importing.model.VendorImporter):
pass
class VendorContactImporter(FromCore, corepos_importing.model.VendorContactImporter):
pass
class ProductImporter(FromCore, corepos_importing.model.ProductImporter):
pass
class ProductFlagImporter(FromCore, corepos_importing.model.ProductFlagImporter):
pass
class VendorItemImporter(FromCore, corepos_importing.model.VendorItemImporter):
pass
class EmployeeImporter(FromCore, corepos_importing.model.EmployeeImporter):
pass
class CustDataImporter(FromCore, corepos_importing.model.CustDataImporter):
pass
class MemberTypeImporter(FromCore, corepos_importing.model.MemberTypeImporter):
pass
class MemberInfoImporter(FromCore, corepos_importing.model.MemberInfoImporter):
pass
class HouseCouponImporter(FromCore, corepos_importing.model.HouseCouponImporter):
pass

View file

@ -2,7 +2,7 @@
################################################################################ ################################################################################
# #
# Rattail -- Retail Software Framework # Rattail -- Retail Software Framework
# Copyright © 2010-2020 Lance Edgar # Copyright © 2010-2023 Lance Edgar
# #
# This file is part of Rattail. # This file is part of Rattail.
# #
@ -24,27 +24,10 @@
CSV -> CORE data import CSV -> CORE data import
""" """
from corepos.db.office_op import model as corepos, Session as CoreSession import warnings
from rattail.importing.handlers import FromFileHandler warnings.warn("rattail_corepos.corepos.importing is deprecated; "
from rattail.importing.csv import FromCSVToSQLAlchemyMixin "please use rattail_corepos.corepos.office.importing instead",
from rattail_corepos.corepos.importing.db.model import ToCore DeprecationWarning, stacklevel=2)
from rattail_corepos.corepos.importing.db.corepos import ToCoreHandler
from rattail_corepos.corepos.office.importing.db.csv import *
class FromCSVToCore(FromCSVToSQLAlchemyMixin, FromFileHandler, ToCoreHandler):
"""
Handler for CSV -> CORE data import
"""
host_title = "CSV"
ToParent = ToCore
@property
def local_title(self):
return "CORE ({})".format(self.dbkey)
def get_model(self):
return corepos
def make_session(self):
return CoreSession(bind=self.config.corepos_engines[self.dbkey])

View file

@ -2,7 +2,7 @@
################################################################################ ################################################################################
# #
# Rattail -- Retail Software Framework # Rattail -- Retail Software Framework
# Copyright © 2010-2020 Lance Edgar # Copyright © 2010-2023 Lance Edgar
# #
# This file is part of Rattail. # This file is part of Rattail.
# #
@ -23,3 +23,11 @@
""" """
Exporting data from CORE-POS (direct DB) Exporting data from CORE-POS (direct DB)
""" """
import warnings
warnings.warn("rattail_corepos.corepos.importing is deprecated; "
"please use rattail_corepos.corepos.office.importing instead",
DeprecationWarning, stacklevel=2)
from rattail_corepos.corepos.office.importing.db.exporters import *

View file

@ -24,658 +24,10 @@
CORE-POS -> Catapult Inventory Workbook CORE-POS -> Catapult Inventory Workbook
""" """
import re import warnings
import datetime
import decimal
import logging
from collections import OrderedDict
from sqlalchemy.exc import ProgrammingError warnings.warn("rattail_corepos.corepos.importing is deprecated; "
from sqlalchemy import orm "please use rattail_corepos.corepos.office.importing instead",
from sqlalchemy.orm.exc import NoResultFound DeprecationWarning, stacklevel=2)
from corepos import enum as corepos_enum from rattail_corepos.corepos.office.importing.db.exporters.catapult_inventory import *
from corepos.db.office_op import model as corepos
from corepos.db.util import table_exists
from rattail.gpc import GPC
from rattail.core import get_uuid
from rattail.importing.handlers import ToFileHandler
from rattail_corepos.corepos.importing.db.corepos import FromCoreHandler, FromCore
from rattail_onager.catapult.importing import inventory as catapult_importing
log = logging.getLogger(__name__)
class FromCoreToCatapult(FromCoreHandler, ToFileHandler):
"""
Handler for CORE -> Catapult (Inventory Workbook)
"""
host_title = "CORE-POS"
local_title = "Catapult (Inventory Workbook)"
direction = 'export'
def get_importers(self):
importers = OrderedDict()
importers['InventoryItem'] = InventoryItemImporter
return importers
class InventoryItemImporter(FromCore, catapult_importing.model.InventoryItemImporter):
"""
Inventory Item data importer.
"""
host_model_class = corepos.Product
# note that we use a "dummy" uuid key here, so logic will consider each row
# to be unique, even when duplicate item_id's are present
key = 'uuid'
supported_fields = [
'uuid',
'item_id',
'dept_id',
'dept_name',
'receipt_alias',
'brand',
'item_name',
'size',
# 'sugg_retail',
'last_cost',
'price_divider',
'base_price',
# 'disc_mult',
'ideal_margin',
'bottle_deposit',
# 'pos_menu_group',
'scale_label',
'sold_by_ea_or_lb',
'quantity_required',
'weight_profile',
'tax_1',
'tax_2',
'spec_tend_1',
'spec_tend_2',
'age_required',
'location',
# 'family_line',
'alt_id',
'alt_receipt_alias',
'alt_pkg_qty',
'alt_pkg_price',
'auto_discount',
'supplier_unit_id',
'supplier_id',
'unit',
'num_pkgs',
# 'cs_pk_multiplier',
# 'dsd',
'pf1',
# 'pf2',
# 'pf3',
# 'pf4',
# 'pf5',
# 'pf6',
# 'pf7',
# 'pf8',
'memo',
'scale_shelf_life',
'scale_shelf_life_type',
'scale_ingredient_text',
]
# we want to add a "duplicate" column at the end
include_duplicate_column = True
# we want to add an "alternate for" column at the end
include_alt_for_column = True
type2_upc_pattern = re.compile(r'^2(\d{5})00000\d')
def setup(self):
super(InventoryItemImporter, self).setup()
# this is used for sorting, when a value has no date
self.old_datetime = datetime.datetime(1900, 1, 1)
self.exclude_invalid_upc = self.config.getbool(
'corepos', 'exporting.catapult_inventory.exclude_invalid_upc',
default=False)
self.warn_invalid_upc = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_invalid_upc',
default=True)
self.ignored_upcs = self.config.getlist(
'corepos', 'exporting.catapult_inventory.ignored_upcs')
self.exclude_missing_department = self.config.getbool(
'corepos', 'exporting.catapult_inventory.exclude_missing_department',
default=False)
self.warn_missing_department = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_missing_department',
default=True)
self.warn_empty_subdepartment = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_empty_subdepartment',
default=True)
self.warn_truncated_receipt_alias = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_truncated_receipt_alias',
default=True)
self.warn_size_null_byte = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_size_null_byte',
default=True)
self.warn_unknown_deposit = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_unknown_deposit',
default=True)
self.warn_scale_label_non_plu = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_scale_label_non_plu',
default=True)
self.warn_scale_label_short_plu = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_scale_label_short_plu',
default=True)
self.warn_weight_profile_non_plu = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_weight_profile_non_plu',
default=True)
self.warn_multiple_vendor_items = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_multiple_vendor_items',
default=True)
self.warn_no_valid_vendor_items = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_no_valid_vendor_items',
default=True)
self.warn_truncated_memo = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_truncated_memo',
default=True)
self.warn_scale_ingredients_newline = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_scale_ingredients_newline',
default=True)
self.floor_sections_exist = table_exists(self.host_session,
corepos.FloorSection)
self.tax_components_exist = table_exists(self.host_session,
corepos.TaxRateComponent)
self.tax_rate_ids_1 = self.config.getlist(
'corepos', 'exporting.catapult_inventory.tax_rate_ids_1', default=[])
self.tax_rate_ids_1 = [int(id) for id in self.tax_rate_ids_1]
self.tax_rate_ids_2 = self.config.getlist(
'corepos', 'exporting.catapult_inventory.tax_rate_ids_2', default=[])
self.tax_rate_ids_2 = [int(id) for id in self.tax_rate_ids_2]
# TODO: should add component id levels too?
# tax_component_ids_1 = (1,)
# tax_component_ids_2 = (2,)
self.cache_bottle_deposits()
self.cache_like_codes()
def cache_bottle_deposits(self):
self.deposits = {}
deposits = self.host_session.query(corepos.Product.deposit.distinct())\
.all()
def cache(deposit, i):
assert isinstance(deposit, tuple)
assert len(deposit) == 1
deposit = deposit[0]
if deposit:
deposit = int(deposit)
upc = "{:013d}".format(deposit)
try:
product = self.host_session.query(corepos.Product)\
.filter(corepos.Product.upc == upc)\
.one()
except NoResultFound:
pass # we will log warnings per-item later
else:
self.deposits[deposit] = product
self.progress_loop(cache, deposits,
message="Caching product deposits data")
def cache_like_codes(self):
self.like_codes = {}
mappings = self.host_session.query(corepos.ProductLikeCode)\
.order_by(corepos.ProductLikeCode.like_code_id,
corepos.ProductLikeCode.upc)\
.all()
def cache(mapping, i):
self.like_codes.setdefault(mapping.like_code_id, []).append(mapping)
self.progress_loop(cache, mappings,
message="Caching like codes data")
def query(self):
query = self.host_session.query(corepos.Product)\
.order_by(corepos.Product.upc)\
.options(orm.joinedload(corepos.Product.department))\
.options(orm.joinedload(corepos.Product.subdepartment))\
.options(orm.joinedload(corepos.Product.vendor_items)\
.joinedload(corepos.VendorItem.vendor))\
.options(orm.joinedload(corepos.Product.default_vendor))\
.options(orm.joinedload(corepos.Product.scale_item))\
.options(orm.joinedload(corepos.Product.user_info))\
.options(orm.joinedload(corepos.Product.tax_rate))\
.options(orm.joinedload(corepos.Product._like_code))
if self.floor_sections_exist:
query = query.options(orm.joinedload(corepos.Product.physical_location)\
.joinedload(corepos.ProductPhysicalLocation.floor_section))
return query
def normalize_host_data(self, host_objects=None):
normalized = super(InventoryItemImporter, self).normalize_host_data(host_objects=host_objects)
# re-sort the results by item_id, since e.g. original UPC from CORE may
# have been replaced with a PLU. also put non-numeric first, to bring
# them to user's attention
numeric = []
non_numeric = []
for row in normalized:
if row['item_id'] and row['item_id'].isdigit():
numeric.append(row)
else:
non_numeric.append(row)
numeric.sort(key=lambda row: int(row['item_id']))
non_numeric.sort(key=lambda row: row['item_id'])
normalized = non_numeric + numeric
# now we must check for duplicate item ids, and mark rows accordingly.
# but we *do* want to include/preserve all rows, hence we mark them
# instead of pruning some out. first step is to group all by item_id
items = {}
def collect(row, i):
items.setdefault(row['item_id'], []).append(row)
self.progress_loop(collect, normalized,
message="Grouping rows by Item ID")
# now we go through our groupings and for any item_id with more than 1
# row, we'll mark each row as having a duplicate item_id. note that
# this modifies such a row "in-place" for our overall return value
def inspect(rows, i):
if len(rows) > 1:
for row in rows:
row['__duplicate__'] = True
self.progress_loop(inspect, list(items.values()),
message="Marking any duplicate Item IDs")
# finally, we must inspect the like codes and figure out which
# product(s) should potentially be considered "alternate for" another.
# first step here will be to create mapping of item_id values for each
# CORE product in our result set
item_ids = {}
def mapp(row, i):
product = row['__product__']
item_ids[product.upc] = row['item_id']
self.progress_loop(mapp, normalized,
message="Mapping item_id for CORE products")
# next step here is to check each product and mark "alt for" as needed
def inspect(row, i):
product = row['__product__']
if product.like_code:
others = self.like_codes.get(product.like_code.id)
if others:
first = others[0]
if first.upc != product.upc:
row['__alternate_for__'] = item_ids[first.upc]
self.progress_loop(inspect, normalized,
message="Marking any \"alternate for\" items")
return normalized
def normalize_host_object(self, product):
item_id = product.upc
if self.ignored_upcs and item_id in self.ignored_upcs:
log.debug("ignoring UPC %s for product: %s", product.upc, product)
return
if not item_id:
logger = log.warning if self.warn_invalid_upc else log.debug
logger("product id %s has no upc: %s", product.id, product)
if self.exclude_invalid_upc:
return
if not item_id.isdigit():
logger = log.warning if self.warn_invalid_upc else log.debug
logger("product %s has non-numeric upc: %s",
product.upc, product)
if self.exclude_invalid_upc:
return
# convert item_id either to a PLU, or formatted UPC
is_plu = False
if item_id.isdigit(): # can only convert if it's numeric!
if len(str(int(item_id))) < 6:
is_plu = True
item_id = str(int(item_id))
else: # must add check digit, and re-format
upc = GPC(item_id, calc_check_digit='upc')
item_id = str(upc)
assert len(item_id) == 14
# drop leading zero(s)
if item_id[1] == '0': # UPC-A
item_id = item_id[2:]
assert len(item_id) == 12
else: # EAN13
item_id = item_id[1:]
assert len(item_id) == 13
# figure out the "scale label" data, which may also affect item_id
scale_item = product.scale_item
scale_label = None
if scale_item:
scale_label = 'Y'
if item_id.isdigit():
if len(item_id) < 5:
logger = log.warning if self.warn_scale_label_short_plu else log.debug
logger("product %s has scale label, but PLU is less than 5 digits (%s): %s",
product.upc, item_id, product)
elif len(item_id) > 5:
match = self.type2_upc_pattern.match(item_id)
if match:
# convert type-2 UPC to PLU
is_plu = True
item_id = str(int(match.group(1)))
log.debug("converted type-2 UPC %s to PLU %s for: %s",
product.upc, item_id, product)
else:
logger = log.warning if self.warn_scale_label_non_plu else log.debug
logger("product %s has scale label, but non-PLU item_id: %s",
product.upc, product)
department = product.department
if not department:
logger = log.warning if self.warn_missing_department else log.debug
logger("product %s has no department: %s", product.upc, product)
if self.exclude_missing_department:
return
# size may come from one of two fields, or combination thereof
pack_size = (product.size or '').strip()
uom = (product.unit_of_measure or '').strip()
numeric_pack = False
if pack_size:
try:
decimal.Decimal(pack_size)
except decimal.InvalidOperation:
pass
else:
numeric_pack = True
if numeric_pack:
size = "{} {}".format(pack_size, uom).strip()
else:
size = pack_size or uom or None
# TODO: this logic may actually be client-specific? i just happened to
# find some null chars in a client DB and needed to avoid them, b/c the
# openpyxl lib said IllegalCharacterError
if size is not None and '\x00' in size:
logger = log.warning if self.warn_size_null_byte else log.debug
logger("product %s has null byte in size field: %s",
product.upc, product)
size = size.replace('\x00', '')
price_divider = None
if (product.quantity and product.group_price and
product.price_method == corepos_enum.PRODUCT_PRICE_METHOD_ALWAYS):
diff = (product.quantity * product.normal_price) - product.group_price
if abs(round(diff, 2)) > .01:
log.warning("product %s has multi-price with $%0.2f diff: %s",
product.upc, diff, product)
price_divider = product.quantity
bottle_deposit = None
if product.deposit:
deposit = int(product.deposit)
if deposit in self.deposits:
bottle_deposit = self.deposits[deposit].normal_price
else:
logger = log.warning if self.warn_unknown_deposit else log.debug
logger("product %s has unknown deposit %s which will be ignored: %s",
product.upc, deposit, product)
sold_by_ea_or_lb = None
if is_plu:
sold_by_ea_or_lb = 'LB' if product.scale else 'EA'
weight_profile = None
if product.scale or scale_item:
if not is_plu:
logger = log.warning if self.warn_weight_profile_non_plu else log.debug
logger("product %s has weight profile, but non-PLU item_id %s: %s",
product.upc, item_id, product)
weight_profile = 'LBNT'
# calculate tax rates according to configured "mappings"
tax_1 = 0
tax_2 = 0
if product.tax_rate:
# TODO: need to finish the logic to handle tax components
if self.tax_components_exist and product.tax_rate.components:
# for component in product.tax_rate.components:
# if tax_component_ids_1 and component.id in tax_component_ids_1:
# tax_1 += component.rate
# if tax_component_ids_2 and component.id in tax_component_ids_2:
# tax_2 += component.rate
raise NotImplementedError
else: # no components
rate = product.tax_rate
if self.tax_rate_ids_1 and rate.id in self.tax_rate_ids_1:
tax_1 += rate.rate
if self.tax_rate_ids_2 and rate.id in self.tax_rate_ids_2:
tax_2 += rate.rate
if not (self.tax_rate_ids_1 or self.tax_rate_ids_2) and rate.rate:
log.warning("product %s has unknown tax rate %s (%s) which will "
"be considered as tax 1: %s",
product.upc, rate.rate, rate.description, product)
tax_1 += rate.rate
location = None
if self.floor_sections_exist and product.physical_location and product.physical_location.floor_section:
location = product.physical_location.floor_section.name
if len(location) > 30:
log.warning("product %s has location length %s; will truncate: %s",
product.upc, len(location), location)
location = location[:30]
# no alt item (or auto discount) by default
alt_id = None
alt_receipt_alias = None
alt_pkg_qty = None
alt_pkg_price = None
auto_discount = None
# make an alt item, when main item has pack pricing (e.g. Zevia sodas)
# note that in this case the main item_id and alt_id are the same
if (product.quantity and product.group_price and
product.price_method == corepos_enum.PRODUCT_PRICE_METHOD_FULL_SETS):
alt_id = item_id
suffix = "{}-PK".format(product.quantity)
alt_receipt_alias = "{} {}".format(product.description, suffix)
if len(alt_receipt_alias) > 32:
logger = log.warning if self.warn_truncated_receipt_alias else log.debug
logger("alt receipt alias for %s is %s chars; must truncate: %s",
alt_id, len(alt_receipt_alias), alt_receipt_alias)
overage = len(alt_receipt_alias) - 32
alt_receipt_alias = "{} {}".format(
product.description[:-overage], suffix)
assert len(alt_receipt_alias) == 32
alt_pkg_qty = product.quantity
alt_pkg_price = product.group_price
# we also must declare an "auto discount" to get pack price
auto_discount = "{} @ ${:0.2f}".format(alt_pkg_qty, alt_pkg_price)
# no supplier info by default
supplier_unit_id = None
supplier_id = None
supplier_unit = None
supplier_num_pkgs = None
# maybe add supplier info, for "default" `vendorItems` record. we'll
# have to get a little creative to figure out which is the default
vendor_items = []
# first we try to narrow down according to product's default vendor
if product.default_vendor:
vendor_items = [item for item in product.vendor_items
if item.vendor is product.default_vendor]
# but if that didn't work, just use any "valid" vendorItems
if not vendor_items:
# valid in this context means, not missing vendor
vendor_items = [item for item in product.vendor_items
if item.vendor]
if not vendor_items and product.vendor_items:
logger = log.warning if self.warn_no_valid_vendor_items else log.debug
logger("product %s has %s vendorItems but each is missing (valid) vendor: %s",
product.upc, len(product.vendor_items), product)
if vendor_items:
if len(vendor_items) > 1:
# try to narrow down a bit further, based on valid 'units' amount
valid_items = [item for item in vendor_items
if item.units]
if valid_items:
vendor_items = valid_items
# warn if we still have more than one "obvious" vendor item
if len(vendor_items) > 1:
logger = log.warning if self.warn_multiple_vendor_items else log.debug
logger("product %s has %s vendorItems to pick from: %s",
product.upc, len(vendor_items), product)
# sort the list so most recently modified is first
vendor_items.sort(key=lambda item: item.modified or self.old_datetime,
reverse=True)
# use the "first" vendor item available
item = vendor_items[0]
supplier_unit_id = item.sku
supplier_id = item.vendor.name
supplier_num_pkgs = item.units or 1
if supplier_num_pkgs == 1:
supplier_unit = 'LB' if product.scale else 'EA'
else:
supplier_unit = 'CS'
pf1 = None
if product.subdepartment:
if not product.subdepartment.number:
logger = log.warning if self.warn_empty_subdepartment else log.debug
logger("product %s has 'empty' subdepartment number: %s",
product.upc, product)
else:
pf1 = "{} {}".format(product.subdepartment.number,
product.subdepartment.name)
memo = None
if product.user_info and product.user_info.long_text is not None:
memo = str(product.user_info.long_text)
if memo and len(memo) > 254:
logger = log.warning if self.warn_truncated_memo else log.debug
logger("product %s has memo of length %s; will truncate: %s",
product.upc, len(memo), memo)
memo = memo[:254]
scale_ingredient_text = None
if scale_item:
scale_ingredient_text = scale_item.text
if "\n" in scale_ingredient_text:
logger = log.warning if self.warn_scale_ingredients_newline else log.debug
logger("must remove carriage returns for scale ingredients: %s",
scale_ingredient_text)
scale_ingredient_text = scale_ingredient_text.replace("\n", " ")
return {
'__product__': product,
'__original_item_id__': product.upc,
'uuid': get_uuid(),
'item_id': item_id,
'dept_id': department.number if department else None,
'dept_name': department.name if department else None,
'receipt_alias': product.description,
'brand': product.brand,
'item_name': product.description,
'size': size,
# TODO: does CORE have this?
# 'sugg_retail': None,
'last_cost': product.cost,
'price_divider': price_divider,
'base_price': product.normal_price,
'ideal_margin': department.margin * 100 if department and department.margin else None,
# TODO: does CORE have these?
# 'disc_mult': None,
'bottle_deposit': bottle_deposit,
# TODO: does CORE have this?
# 'pos_menu_group': None,
'scale_label': scale_label,
'sold_by_ea_or_lb': sold_by_ea_or_lb,
'quantity_required': 'Y' if product.quantity_enforced else None,
'weight_profile': weight_profile,
'tax_1': tax_1 or None, # TODO: logic above is unfinished
'tax_2': tax_2 or None, # TODO: logic above is unfinished
'spec_tend_1': 'EBT' if product.foodstamp else None,
'spec_tend_2': 'WIC' if product.wicable else None,
'age_required': product.id_enforced or None,
'location': location,
# TODO: does CORE have these?
# 'family_line': None,
'alt_id': alt_id,
'alt_receipt_alias': alt_receipt_alias,
'alt_pkg_qty': alt_pkg_qty,
'alt_pkg_price': alt_pkg_price,
'auto_discount': auto_discount,
'supplier_unit_id': supplier_unit_id,
'supplier_id': supplier_id,
'unit': supplier_unit,
'num_pkgs': supplier_num_pkgs,
# TODO: does CORE have these?
# 'cs_pk_multiplier': None,
# 'dsd': None,
'pf1': pf1,
# TODO: are these needed?
# 'pf2',
# 'pf3',
# 'pf4',
# 'pf5',
# 'pf6',
# 'pf7',
# 'pf8',
'memo': memo,
'scale_shelf_life': scale_item.shelf_life if scale_item else None,
'scale_shelf_life_type': 0 if scale_item else None,
'scale_ingredient_text': scale_ingredient_text,
}

View file

@ -24,164 +24,10 @@
CORE-POS -> Catapult Membership Workbook CORE-POS -> Catapult Membership Workbook
""" """
import decimal import warnings
import logging
from collections import OrderedDict
from sqlalchemy import orm warnings.warn("rattail_corepos.corepos.importing is deprecated; "
"please use rattail_corepos.corepos.office.importing instead",
DeprecationWarning, stacklevel=2)
from corepos.db.office_op import model as corepos from rattail_corepos.corepos.office.importing.db.exporters.catapult_membership import *
from rattail.importing.handlers import ToFileHandler
from rattail.time import localtime
from rattail.excel import ExcelReader
from rattail_corepos.corepos.importing.db.corepos import FromCoreHandler, FromCore
from rattail_onager.catapult.importing import membership as catapult_importing
log = logging.getLogger(__name__)
class FromCoreToCatapult(FromCoreHandler, ToFileHandler):
"""
Handler for CORE -> Catapult (Membership Workbook)
"""
host_title = "CORE-POS"
local_title = "Catapult (Membership Workbook)"
direction = 'export'
def get_importers(self):
importers = OrderedDict()
importers['Member'] = MemberImporter
return importers
class MemberImporter(FromCore, catapult_importing.model.MemberImporter):
"""
Member data importer.
"""
host_model_class = corepos.CustData
supported_fields = [
'member_id',
'first_name',
'last_name',
'address_1',
'address_2',
'city',
'state',
'zip_code',
'phone_number',
'email',
'member_notes',
'member_join_date',
'family_affiliation',
'account_number',
'membership_profile_name',
]
def setup(self):
super(MemberImporter, self).setup()
self.warn_truncated_field = self.config.getbool(
'corepos', 'exporting.catapult_membership.warn_truncated_field',
default=True)
self.cache_membership_profiles()
def cache_membership_profiles(self):
self.membership_profiles = {}
sheet_name = self.config.get('catapult', 'membership_profiles_worksheet_name',
default="Membership Profile details")
reader = ExcelReader(self.workbook_template_path, sheet_name=sheet_name)
for profile in reader.read_rows(progress=self.progress):
if profile['Equity Paid in Full Amount']:
profile['Equity Paid in Full Amount'] = decimal.Decimal(profile['Equity Paid in Full Amount'])
self.membership_profiles[profile['Membership Profile Name']] = profile
# also figure out which profile is default
self.default_membership_profile = None
for profile in self.membership_profiles.values():
if profile['Please indicate Default Profile'] == 'X':
self.default_membership_profile = profile
break
if not self.default_membership_profile:
raise RuntimeError("cannot determine default membership profile")
def query(self):
return self.host_session.query(corepos.CustData)\
.order_by(corepos.CustData.card_number,
corepos.CustData.person_number,
corepos.CustData.id)\
.options(orm.joinedload(corepos.CustData.member_type))\
.options(orm.joinedload(corepos.CustData.member_info)\
.joinedload(corepos.MemberInfo.dates))\
.options(orm.joinedload(corepos.CustData.member_info)\
.joinedload(corepos.MemberInfo.notes))
def normalize_host_object(self, custdata):
if custdata.person_number == 1:
family_affiliation = False
elif custdata.person_number > 1:
family_affiliation = True
else:
log.warning("member #%s has unexpected person_number (%s): %s",
custdata.card_number, custdata.person_number, custdata)
family_affiliation = False
if custdata.member_type:
membership_profile_name = custdata.member_type.description
else:
log.warning("member #%s has no member type: %s",
custdata.card_number, custdata)
membership_profile_name = self.default_membership_profile['Membership Profile Name']
data = {
'member_id': str(custdata.id),
'first_name': custdata.first_name,
'last_name': custdata.last_name,
# these will be blank unless we have an associated `meminfo` record
'phone_number': None,
'email': None,
'address_1': None,
'address_2': None,
'city': None,
'state': None,
'zip_code': None,
'member_join_date': None,
'member_notes': None,
'family_affiliation': family_affiliation,
'account_number': str(custdata.card_number),
'membership_profile_name': membership_profile_name,
}
info = custdata.member_info
if info:
data['phone_number'] = info.phone
data['email'] = info.email
data['address_1'], data['address_2'] = info.split_street()
data['city'] = info.city
data['state'] = info.state
data['zip_code'] = info.zip
if info.dates:
if len(info.dates) > 1:
log.warning("member #%s has multiple (%s) `memDates` records: %s",
custdata.card_number, len(info.dates), custdata)
dates = info.dates[0]
if dates.start_date:
start_date = localtime(self.config, dates.start_date).date()
data['member_join_date'] = start_date.strftime('%Y-%m-%d')
if info.notes:
notes = []
for note in reversed(info.notes): # show most recent first
text = str(note.note or '').strip() or None
if text:
notes.append(text)
if notes:
data['member_notes'] = '\n'.join(notes)
return data

View file

@ -2,7 +2,7 @@
################################################################################ ################################################################################
# #
# Rattail -- Retail Software Framework # Rattail -- Retail Software Framework
# Copyright © 2010-2020 Lance Edgar # Copyright © 2010-2023 Lance Edgar
# #
# This file is part of Rattail. # This file is part of Rattail.
# #
@ -24,25 +24,10 @@
CORE-POS Data Export CORE-POS Data Export
""" """
from corepos.db.office_op import model as corepos import warnings
from rattail.importing.handlers import ToCSVHandler warnings.warn("rattail_corepos.corepos.importing is deprecated; "
from rattail.importing.exporters import FromSQLAlchemyToCSVMixin "please use rattail_corepos.corepos.office.importing instead",
from rattail_corepos.corepos.importing.db.corepos import FromCoreHandler, FromCore DeprecationWarning, stacklevel=2)
from rattail_corepos.corepos.office.importing.db.exporters.csv import *
class FromCoreToCSV(FromSQLAlchemyToCSVMixin, FromCoreHandler, ToCSVHandler):
"""
Handler for CORE -> CSV data export.
"""
direction = 'export'
local_title = "CSV"
FromParent = FromCore
ignored_model_names = ['Change'] # omit the datasync change model
@property
def host_title(self):
return self.config.node_title(default="CORE")
def get_model(self):
return corepos

View file

@ -2,7 +2,7 @@
################################################################################ ################################################################################
# #
# Rattail -- Retail Software Framework # Rattail -- Retail Software Framework
# Copyright © 2010-2020 Lance Edgar # Copyright © 2010-2023 Lance Edgar
# #
# This file is part of Rattail. # This file is part of Rattail.
# #
@ -22,148 +22,12 @@
################################################################################ ################################################################################
""" """
CORE-POS model importers (direct DB) CORE-POS model importers (direct DB)
.. warning::
All classes in this module are "direct DB" importers, which will write
directly to MySQL. They are meant to be used in dry-run mode only, and/or
for sample data import to a dev system etc. They are *NOT* meant for
production use, as they will completely bypass any CORE business rules logic
which may exist.
""" """
from sqlalchemy.orm.exc import NoResultFound import warnings
from rattail import importing warnings.warn("rattail_corepos.corepos.importing is deprecated; "
"please use rattail_corepos.corepos.office.importing instead",
DeprecationWarning, stacklevel=2)
from corepos.db.office_op import model as corepos from rattail_corepos.corepos.office.importing.db.model import *
from corepos.db.office_trans import model as coretrans
class ToCore(importing.ToSQLAlchemy):
"""
Base class for all CORE "operational" model importers.
"""
def create_object(self, key, host_data):
# NOTE! some tables in CORE DB may be using the MyISAM storage engine,
# which means it is *not* transaction-safe and therefore we cannot rely
# on "rollback" if in dry-run mode! in other words we better not touch
# the record at all, for dry run
if self.dry_run:
return host_data
return super(ToCore, self).create_object(key, host_data)
def update_object(self, obj, host_data, **kwargs):
# NOTE! some tables in CORE DB may be using the MyISAM storage engine,
# which means it is *not* transaction-safe and therefore we cannot rely
# on "rollback" if in dry-run mode! in other words we better not touch
# the record at all, for dry run
if self.dry_run:
return obj
return super(ToCore, self).update_object(obj, host_data, **kwargs)
def delete_object(self, obj):
# NOTE! some tables in CORE DB may be using the MyISAM storage engine,
# which means it is *not* transaction-safe and therefore we cannot rely
# on "rollback" if in dry-run mode! in other words we better not touch
# the record at all, for dry run
if self.dry_run:
return True
return super(ToCore, self).delete_object(obj)
class ToCoreTrans(importing.ToSQLAlchemy):
"""
Base class for all CORE "transaction" model importers
"""
########################################
# CORE Operational
########################################
class DepartmentImporter(ToCore):
model_class = corepos.Department
key = 'number'
class SubdepartmentImporter(ToCore):
model_class = corepos.Subdepartment
key = 'number'
class VendorImporter(ToCore):
model_class = corepos.Vendor
key = 'id'
class VendorContactImporter(ToCore):
model_class = corepos.VendorContact
key = 'vendor_id'
class ProductImporter(ToCore):
model_class = corepos.Product
key = 'id'
class ProductFlagImporter(ToCore):
model_class = corepos.ProductFlag
key = 'bit_number'
class VendorItemImporter(ToCore):
model_class = corepos.VendorItem
key = ('sku', 'vendor_id')
class EmployeeImporter(ToCore):
model_class = corepos.Employee
key = 'number'
class CustDataImporter(ToCore):
model_class = corepos.CustData
key = 'id'
class MemberTypeImporter(ToCore):
model_class = corepos.MemberType
key = 'id'
class MemberInfoImporter(ToCore):
model_class = corepos.MemberInfo
key = 'card_number'
class MemberDateImporter(ToCore):
model_class = corepos.MemberDate
key = 'card_number'
class MemberContactImporter(ToCore):
model_class = corepos.MemberContact
key = 'card_number'
class HouseCouponImporter(ToCore):
model_class = corepos.HouseCoupon
key = 'coupon_id'
########################################
# CORE Transactions
########################################
class TransactionDetailImporter(ToCoreTrans):
"""
CORE-POS transaction data importer.
"""
model_class = coretrans.TransactionDetail

View file

@ -24,175 +24,10 @@
Square -> CORE-POS data importing Square -> CORE-POS data importing
""" """
import re import warnings
import datetime
import decimal
from collections import OrderedDict
import sqlalchemy as sa warnings.warn("rattail_corepos.corepos.importing is deprecated; "
"please use rattail_corepos.corepos.office.importing instead",
DeprecationWarning, stacklevel=2)
from corepos.db.office_trans import Session as CoreTransSession, model as coretrans from rattail_corepos.corepos.office.importing.db.square import *
from rattail import importing
from rattail_corepos.corepos.importing import db as corepos_importing
class FromSquareToCoreTrans(importing.ToSQLAlchemyHandler):
"""
Square -> CORE-POS import handler.
"""
host_title = "Square"
local_title = "CORE-POS"
def make_session(self):
return CoreTransSession()
def get_importers(self):
importers = OrderedDict()
importers['TransactionDetail'] = TransactionDetailImporter
return importers
class FromSquare(importing.FromCSV):
"""
Base class for Square -> CORE-POS importers.
"""
class TransactionDetailImporter(FromSquare, corepos_importing.model.TransactionDetailImporter):
"""
Transaction detail importer.
"""
key = 'store_row_id'
supported_fields = [
'store_row_id',
'date_time',
'card_number',
'upc',
'description',
'quantity',
'unit_price',
'discount',
'tax',
'total',
]
batches_supported = True
def setup(self):
super(TransactionDetailImporter, self).setup()
# cache existing transactions by ID
self.transaction_details = self.cache_model(coretrans.TransactionDetail,
key=self.transaction_detail_key)
# keep track of new IDs
self.new_ids = {}
self.last_new_id = self.get_last_new_id()
def transaction_detail_key(self, detail, normal):
return (
detail.store_id,
detail.register_number,
detail.date_time,
detail.upc,
)
def get_last_new_id(self):
# TODO: pretty sure there is a better way to do this...
return self.session.query(sa.func.max(coretrans.TransactionDetail.store_row_id))\
.scalar() or 0
currency_pattern = re.compile(r'^\$(?P<amount>\d+\.\d\d)$')
currency_pattern_negative = re.compile(r'^\(\$(?P<amount>\d+\.\d\d)\)$')
def parse_currency(self, value):
value = (value or '').strip() or None
if value:
# first check for positive amount
match = self.currency_pattern.match(value)
if match:
return float(match.group('amount'))
# okay then, check for negative amount
match = self.currency_pattern_negative.match(value)
if match:
return 0 - float(match.group('amount'))
def normalize_host_object(self, csvrow):
# date_time
date = datetime.datetime.strptime(csvrow['Date'], '%m/%d/%Y').date()
time = datetime.datetime.strptime(csvrow['Time'], '%H:%M:%S').time()
date_time = datetime.datetime.combine(date, time)
# upc
upc = csvrow['SKU']
# store_row_id
key = (
0, # store_id
None, # register_number
date_time,
upc,
)
if key in self.transaction_details:
store_row_id = self.transaction_details[key].store_row_id
else:
store_row_id = self.last_new_id + 1
self.new_ids[store_row_id] = csvrow
self.last_new_id = store_row_id
# card_number
card_number = csvrow['Customer Reference ID'] or None
if card_number:
card_number = int(card_number)
# description
description = csvrow['Item']
# quantity
quantity = float(csvrow['Qty'])
# unit_price
unit_price = self.parse_currency(csvrow['Gross Sales'])
if unit_price is not None:
unit_price /= quantity
unit_price = decimal.Decimal('{:0.2f}'.format(unit_price))
elif csvrow['Gross Sales']:
log.warning("cannot parse 'unit_price' from: %s", csvrow['Gross Sales'])
# discount
discount = self.parse_currency(csvrow['Discounts'])
if discount is not None:
discount = decimal.Decimal('{:0.2f}'.format(discount))
elif csvrow['Discounts']:
log.warning("cannot parse 'discount' from: %s", csvrow['Discounts'])
# tax
tax = self.parse_currency(csvrow['Tax'])
if csvrow['Tax'] and tax is None:
log.warning("cannot parse 'tax' from: %s", csvrow['Tax'])
tax = bool(tax)
# total
total = self.parse_currency(csvrow['Net Sales'])
if total is not None:
total = decimal.Decimal('{:0.2f}'.format(total))
elif csvrow['Net Sales']:
log.warning("cannot parse 'total' from: %s", csvrow['Net Sales'])
return {
'_object_str': "({}) {}".format(upc, description),
'store_row_id': store_row_id,
'date_time': date_time,
'card_number': card_number,
'upc': upc,
'description': description,
'quantity': quantity,
'unit_price': unit_price,
'discount': discount,
'tax': tax,
'total': total,
}

View file

@ -2,7 +2,7 @@
################################################################################ ################################################################################
# #
# Rattail -- Retail Software Framework # Rattail -- Retail Software Framework
# Copyright © 2010-2021 Lance Edgar # Copyright © 2010-2023 Lance Edgar
# #
# This file is part of Rattail. # This file is part of Rattail.
# #
@ -24,448 +24,10 @@
CORE-POS model importers (webservices API) CORE-POS model importers (webservices API)
""" """
from rattail import importing import warnings
from rattail.util import data_diffs
from rattail_corepos.corepos.util import get_core_members
from rattail_corepos.corepos.api import make_corepos_api
warnings.warn("rattail_corepos.corepos.importing is deprecated; "
"please use rattail_corepos.corepos.office.importing instead",
DeprecationWarning, stacklevel=2)
class ToCoreAPI(importing.Importer): from rattail_corepos.corepos.office.importing.model import *
"""
Base class for all CORE "operational" model importers, which use the API.
"""
# TODO: these importers are in a bit of an experimental state at the
# moment. we only allow create/update b/c it will use the API instead of
# direct DB
allow_delete = False
caches_local_data = True
def setup(self):
self.establish_api()
def establish_api(self):
self.api = make_corepos_api(self.config)
def ensure_fields(self, data):
"""
Ensure each of our supported fields are included in the data. This is
to handle cases where the API does not return all fields, e.g. when
some of them are empty.
"""
for field in self.fields:
if field not in data:
data[field] = None
def fix_empties(self, data, fields):
"""
Fix "empty" values for the given set of fields. This just uses an
empty string instead of ``None`` for each, to add some consistency
where the API might lack it.
Main example so far, is the Vendor API, which may not return some
fields at all (and so our value is ``None``) in some cases, but in
other cases it *will* return a value, default of which is the empty
string. So we want to "pretend" that we get an empty string back even
when we actually get ``None`` from it.
"""
for field in fields:
if data[field] is None:
data[field] = ''
class MemberImporter(ToCoreAPI):
"""
Member model importer for CORE-POS
"""
model_name = 'Member'
key = 'cardNo'
supported_fields = [
'cardNo'
'customerAccountID',
'customers',
# 'memberStatus',
# 'activeStatus',
# 'customerTypeID',
# 'chargeBalance',
# 'chargeLimit',
# 'idCardUPC',
'startDate',
'endDate',
'addressFirstLine',
'addressSecondLine',
'city',
'state',
'zip',
# 'contactAllowed',
# 'contactMethod',
# 'modified',
]
supported_customer_fields = [
'customerID',
# 'customerAccountID',
# 'cardNo',
'firstName',
'lastName',
# 'chargeAllowed',
# 'checksAllowed',
# 'discount',
'accountHolder',
# 'staff',
'phone',
'altPhone',
'email',
# 'memberPricingAllowed',
# 'memberCouponsAllowed',
# 'lowIncomeBenefits',
# 'modified',
]
empty_date_value = '0000-00-00 00:00:00'
def get_local_objects(self, host_data=None):
return get_core_members(self.config, self.api, progress=self.progress)
def get_single_local_object(self, key):
assert len(self.key) == 1
assert self.key[0] == 'cardNo'
return self.api.get_member(key[0])
def normalize_local_object(self, member):
data = dict(member)
return data
def data_diffs(self, local_data, host_data):
diffs = super(MemberImporter, self).data_diffs(local_data, host_data)
# the 'customers' field requires a more granular approach, since the
# data coming from API may have different fields than our local data
if 'customers' in self.fields and 'customers' in diffs:
if not self.customer_data_differs(local_data, host_data):
diffs.remove('customers')
# also the start/end dates should be looked at more closely. if they
# contain the special '__omit__' value then we won't ever count as diff
if 'startDate' in self.fields and 'startDate' in diffs:
if host_data['startDate'] == '__omit__':
diffs.remove('startDate')
if 'endDate' in self.fields and 'endDate' in diffs:
if host_data['endDate'] == '__omit__':
diffs.remove('endDate')
return diffs
def customer_data_differs(self, local_data, host_data):
local_customers = local_data['customers']
host_customers = host_data['customers']
# if both are empty, we're good
if not local_customers and not host_customers:
return False
# obviously we differ if record count doesn't match
if len(local_customers) != len(host_customers):
return True
# okay then, let's traverse the "new" list
for host_customer in host_customers:
# we differ if can't locate corresponding "old" local record
local_customer = self.find_local_customer(local_customers, host_customer)
if not local_customer:
return True
# we differ if old and new records differ
if data_diffs(local_customer, host_customer,
fields=self.supported_customer_fields):
return True
# okay, now let's traverse the "old" list
for local_customer in local_customers:
# we differ if can't locate corresponding "new" host record
host_customer = self.find_host_customer(host_customers, local_customer)
if not host_customer:
return True
# guess we don't differ after all
return False
def find_local_customer(self, local_customers, host_customer):
assert 'customerID' in self.supported_customer_fields
if not host_customer['customerID']:
return # new customer
for local_customer in local_customers:
if local_customer['customerID'] == host_customer['customerID']:
return local_customer
def find_host_customer(self, host_customers, local_customer):
assert 'customerID' in self.supported_customer_fields
for host_customer in host_customers:
if host_customer['customerID'] == local_customer['customerID']:
return host_customer
def create_object(self, key, data):
# we can get away with using the same logic for both here
return self.update_object(None, data)
def update_object(self, member, data, local_data=None):
"""
Push an update for the member, via the CORE API.
"""
if self.dry_run:
return data
cardNo = data.pop('cardNo')
data = dict(data)
if data.get('startDate') == '__omit__':
data.pop('startDate')
if data.get('endDate') == '__omit__':
data.pop('endDate')
member = self.api.set_member(cardNo, **data)
return member
class DepartmentImporter(ToCoreAPI):
"""
Department model importer for CORE-POS
"""
model_name = 'Department'
key = 'dept_no'
supported_fields = [
'dept_no',
'dept_name',
# TODO: should enable some of these fields?
# 'dept_tax',
# 'dept_fs',
# 'dept_limit',
# 'dept_minimum',
# 'dept_discount',
# 'dept_see_id',
# 'modified',
# 'modifiedby',
# 'margin',
# 'salesCode',
# 'memberOnly',
]
def get_local_objects(self, host_data=None):
return self.api.get_departments()
def get_single_local_object(self, key):
assert len(self.key) == 1
assert self.key[0] == 'dept_no'
return self.api.get_department(key[0])
def normalize_local_object(self, department):
data = dict(department)
return data
def create_object(self, key, data):
# we can get away with using the same logic for both here
return self.update_object(None, data)
def update_object(self, department, data, local_data=None):
"""
Push an update for the department, via the CORE API.
"""
if self.dry_run:
return data
dept_no = data.pop('dept_no')
department = self.api.set_department(dept_no, **data)
return department
class SubdepartmentImporter(ToCoreAPI):
"""
Subdepartment model importer for CORE-POS
"""
model_name = 'Subdepartment'
key = 'subdept_no'
supported_fields = [
'subdept_no',
'subdept_name',
'dept_ID',
]
def get_local_objects(self, host_data=None):
return self.api.get_subdepartments()
def get_single_local_object(self, key):
assert len(self.key) == 1
assert self.key[0] == 'subdept_no'
return self.api.get_subdepartment(key[0])
def normalize_local_object(self, subdepartment):
data = dict(subdepartment)
self.ensure_fields(data)
return data
def create_object(self, key, data):
# we can get away with using the same logic for both here
return self.update_object(None, data)
def update_object(self, subdepartment, data, local_data=None):
"""
Push an update for the subdepartment, via the CORE API.
"""
if self.dry_run:
return data
subdept_no = data.pop('subdept_no')
subdepartment = self.api.set_subdepartment(subdept_no, **data)
return subdepartment
class VendorImporter(ToCoreAPI):
"""
Vendor model importer for CORE-POS
"""
model_name = 'Vendor'
key = 'vendorID'
supported_fields = [
'vendorID',
'vendorName',
'vendorAbbreviation',
'shippingMarkup',
'discountRate',
'phone',
'fax',
'email',
'website',
'address',
'city',
'state',
'zip',
'notes',
'localOriginID',
'inactive',
'orderMinimum',
'halfCases',
]
def get_local_objects(self, host_data=None):
return self.api.get_vendors()
def get_single_local_object(self, key):
assert len(self.key) == 1
assert self.key[0] == 'vendorID'
return self.api.get_vendor(key[0])
def normalize_local_object(self, vendor):
data = dict(vendor)
# make sure all fields are present
self.ensure_fields(data)
# fix some "empty" values
self.fix_empties(data, ['phone', 'fax', 'email'])
# convert some values to native type
data['discountRate'] = float(data['discountRate'])
return data
def create_object(self, key, data):
# we can get away with using the same logic for both here
return self.update_object(None, data)
def update_object(self, vendor, data, local_data=None):
"""
Push an update for the vendor, via the CORE API.
"""
if self.dry_run:
return data
vendorID = data.pop('vendorID')
vendor = self.api.set_vendor(vendorID, **data)
return vendor
class ProductImporter(ToCoreAPI):
"""
Product model importer for CORE-POS
"""
model_name = 'Product'
key = 'upc'
supported_fields = [
'upc',
'brand',
'description',
'size',
'department',
'normal_price',
'foodstamp',
'scale',
# 'tax', # TODO!
# TODO: maybe enable some of these fields?
# 'formatted_name',
# 'pricemethod',
# 'groupprice',
# 'quantity',
# 'special_price',
# 'specialpricemethod',
# 'specialgroupprice',
# 'specialquantity',
# 'start_date',
# 'end_date',
# 'scaleprice',
# 'mixmatchcode',
# 'modified',
# 'tareweight',
# 'discount',
# 'discounttype',
# 'line_item_discountable',
# 'unitofmeasure',
# 'wicable',
# 'qttyEnforced',
# 'idEnforced',
# 'cost',
# 'inUse',
# 'numflag',
# 'subdept',
# 'deposit',
# 'local',
# 'store_id',
# 'default_vendor_id',
# 'current_origin_id',
]
def get_local_objects(self, host_data=None):
return self.api.get_products()
def get_single_local_object(self, key):
assert len(self.key) == 1
assert self.key[0] == 'upc'
return self.api.get_product(key[0])
def normalize_local_object(self, product):
data = dict(product)
# make sure all fields are present
self.ensure_fields(data)
# fix some "empty" values
self.fix_empties(data, ['brand'])
return data
def create_object(self, key, data):
# we can get away with using the same logic for both here
return self.update_object(None, data)
def update_object(self, product, data, local_data=None):
"""
Push an update for the product, via the CORE API.
"""
if self.dry_run:
return data
upc = data.pop('upc')
product = self.api.set_product(upc, **data)
return product

View file

@ -24,321 +24,10 @@
Rattail -> CORE-POS data export Rattail -> CORE-POS data export
""" """
import logging import warnings
from collections import OrderedDict
from sqlalchemy import orm warnings.warn("rattail_corepos.corepos.importing is deprecated; "
"please use rattail_corepos.corepos.office.importing instead",
DeprecationWarning, stacklevel=2)
from rattail import importing from rattail_corepos.corepos.office.importing.rattail import *
from rattail.db import model
from rattail.util import pretty_quantity
from rattail_corepos.corepos import importing as corepos_importing
from rattail_corepos.corepos.util import get_max_existing_vendor_id
log = logging.getLogger(__name__)
class ToCOREAPIHandler(importing.ImportHandler):
"""
Base class for handlers targeting the CORE API.
"""
local_key = 'corepos_api'
generic_local_title = "CORE Office (API)"
@property
def local_title(self):
return "CORE-POS (API)"
class FromRattailToCore(importing.FromRattailHandler, ToCOREAPIHandler):
"""
Rattail -> CORE-POS export handler
"""
direction = 'export'
safe_for_web_app = True
def get_importers(self):
importers = OrderedDict()
importers['Member'] = MemberImporter
importers['Department'] = DepartmentImporter
importers['Subdepartment'] = SubdepartmentImporter
importers['Vendor'] = VendorImporter
importers['Product'] = ProductImporter
return importers
class FromRattail(importing.FromSQLAlchemy):
"""
Base class for Rattail -> CORE-POS exporters.
"""
class MemberImporter(FromRattail, corepos_importing.model.MemberImporter):
"""
Member data exporter
"""
host_model_class = model.Customer
key = 'cardNo'
supported_fields = [
'cardNo',
'customerAccountID',
'customers',
'addressFirstLine',
'addressSecondLine',
'city',
'state',
'zip',
'startDate',
'endDate',
]
supported_customer_fields = [
'customerID',
'firstName',
'lastName',
'accountHolder',
'phone',
'altPhone',
'email',
]
def query(self):
query = super(MemberImporter, self).query()
query = query.options(orm.joinedload(model.Customer.addresses))\
.options(orm.joinedload(model.Customer._people)\
.joinedload(model.CustomerPerson.person)\
.joinedload(model.Person.phones))\
.options(orm.joinedload(model.Customer._people)\
.joinedload(model.CustomerPerson.person)\
.joinedload(model.Person.emails))
return query
def normalize_host_object(self, customer):
address = customer.addresses[0] if customer.addresses else None
people = []
for i, person in enumerate(customer.people, 1):
phones = person.phones
phone1 = phones[0] if phones else None
phone2 = phones[1] if len(phones) > 1 else None
email = person.emails[0] if person.emails else None
people.append({
'customerID': str(person.corepos_customer_id),
'firstName': person.first_name,
'lastName': person.last_name,
'accountHolder': i == 1,
'phone': phone1.number if phone1 else '',
'altPhone': phone2.number if phone2 else '',
'email': email.address if email else '',
})
member = customer.only_member(require=False)
if member:
if member.joined:
start_date = member.joined.strftime('%Y-%m-%d 00:00:00')
else:
start_date = self.empty_date_value
if member.withdrew:
end_date = member.withdrew.strftime('%Y-%m-%d 00:00:00')
else:
end_date = self.empty_date_value
else:
start_date = '__omit__'
end_date = '__omit__'
return {
'cardNo': customer.number,
'customerAccountID': customer.id,
'addressFirstLine': address.street if address else '',
'addressSecondLine': address.street2 if address else '',
'city': address.city if address else '',
'state': address.state if address else '',
'zip': address.zipcode if address else '',
'startDate': start_date,
'endDate': end_date,
'customers': people,
}
class DepartmentImporter(FromRattail, corepos_importing.model.DepartmentImporter):
"""
Department data exporter
"""
host_model_class = model.Department
key = 'dept_no'
supported_fields = [
'dept_no',
'dept_name',
]
def normalize_host_object(self, department):
return {
'dept_no': str(department.number),
'dept_name': department.name,
}
class SubdepartmentImporter(FromRattail, corepos_importing.model.SubdepartmentImporter):
"""
Subdepartment data exporter
"""
host_model_class = model.Subdepartment
key = 'subdept_no'
supported_fields = [
'subdept_no',
'subdept_name',
'dept_ID',
]
def normalize_host_object(self, subdepartment):
department = subdepartment.department
return {
'subdept_no': str(subdepartment.number),
'subdept_name': subdepartment.name,
'dept_ID': str(department.number) if department else None,
}
class VendorImporter(FromRattail, corepos_importing.model.VendorImporter):
"""
Vendor data exporter
"""
host_model_class = model.Vendor
key = 'vendorID'
supported_fields = [
'vendorID',
'vendorName',
'vendorAbbreviation',
'discountRate',
'phone',
'fax',
'email',
]
def setup(self):
super(VendorImporter, self).setup()
# self.max_existing_vendor_id = self.get_max_existing_vendor_id()
self.max_existing_vendor_id = get_max_existing_vendor_id(self.config)
self.last_vendor_id = self.max_existing_vendor_id
def get_next_vendor_id(self):
if hasattr(self, 'last_vendor_id'):
self.last_vendor_id += 1
return self.last_vendor_id
last_vendor_id = get_max_existing_vendor_id(self.config)
return last_vendor_id + 1
def normalize_host_object(self, vendor):
vendor_id = vendor.corepos_id
if not vendor_id:
vendor_id = self.get_next_vendor_id()
data = {
'vendorID': str(vendor_id),
'vendorName': vendor.name,
'vendorAbbreviation': vendor.abbreviation or '',
'discountRate': float(vendor.special_discount or 0),
}
if 'phone' in self.fields:
phones = [phone for phone in vendor.phones
if phone.type == 'Voice']
data['phone'] = phones[0].number if phones else ''
if 'fax' in self.fields:
phones = [phone for phone in vendor.phones
if phone.type == 'Fax']
data['fax'] = phones[0].number if phones else ''
if 'email' in self.fields:
email = vendor.email
data['email'] = email.address if email else ''
# also embed original Rattail vendor object, if we'll be needing to
# update it later with a new CORE ID
if not vendor.corepos_id:
data['_rattail_vendor'] = vendor
return data
def create_object(self, key, data):
# grab vendor object we (maybe) stashed when normalizing
rattail_vendor = data.pop('_rattail_vendor', None)
# do normal create logic
vendor = super(VendorImporter, self).create_object(key, data)
if vendor:
# maybe set the CORE ID for vendor in Rattail
if rattail_vendor:
rattail_vendor.corepos_id = int(vendor['vendorID'])
return vendor
class ProductImporter(FromRattail, corepos_importing.model.ProductImporter):
"""
Product data exporter
"""
host_model_class = model.Product
key = 'upc'
supported_fields = [
'upc',
'brand',
'description',
'size',
'unitofmeasure',
'department',
'normal_price',
'foodstamp',
'scale',
]
def normalize_host_object(self, product):
upc = product.item_id
if not upc and product.upc:
upc = str(product.upc)[:-1]
if not upc:
log.warning("skipping product %s with unknown upc: %s",
product.uuid, product)
return
return {
'_product': product,
'upc': upc,
'brand': product.brand.name if product.brand else '',
'description': product.description or '',
'size': pretty_quantity(product.unit_size),
'unitofmeasure': product.uom_abbreviation,
'department': str(product.department.number) if product.department else None,
'normal_price': '{:0.2f}'.format(product.regular_price.price) if product.regular_price else None,
'foodstamp': '1' if product.food_stampable else '0',
'scale': '1' if product.weighed else '0',
}
def create_object(self, key, data):
# must be sure not to pass the original Product instance, or else the
# API call will try to serialize and submit it
product = data.pop('_product')
corepos_product = super(ProductImporter, self).create_object(key, data)
if corepos_product:
# update our Rattail Product with the CORE ID
if not self.dry_run:
product.corepos_id = int(corepos_product['id'])
return corepos_product
def update_object(self, corepos_product, data, local_data=None):
# must be sure not to pass the original Product instance, or else the
# API call will try to serialize and submit it
product = data.pop('_product', None)
corepos_product = super(ProductImporter, self).update_object(corepos_product, data, local_data)
return corepos_product

View file

@ -0,0 +1,27 @@
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2020 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Importing data into CORE-POS
"""
from . import model

View file

@ -0,0 +1,27 @@
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2020 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Importing data into CORE-POS (direct DB)
"""
from . import model

View file

@ -0,0 +1,160 @@
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2023 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
CORE-POS -> CORE-POS data import
"""
from collections import OrderedDict
from corepos.db.office_op import Session as CoreSession
from rattail.importing.handlers import FromSQLAlchemyHandler, ToSQLAlchemyHandler
from rattail.importing.sqlalchemy import FromSQLAlchemySameToSame
from rattail_corepos.corepos.importing import db as corepos_importing
class FromCoreHandler(FromSQLAlchemyHandler):
"""
Base class for import handlers which use a CORE database as the host / source.
"""
host_title = "CORE"
host_key = 'corepos_db_office_op'
def make_host_session(self):
return CoreSession()
class ToCoreHandler(ToSQLAlchemyHandler):
"""
Base class for import handlers which target a CORE database on the local side.
"""
local_title = "CORE"
local_key = 'corepos_db_office_op'
def make_session(self):
return CoreSession()
class FromCoreToCoreBase(object):
"""
Common base class for Core -> Core data import/export handlers.
"""
def get_importers(self):
importers = OrderedDict()
importers['Department'] = DepartmentImporter
importers['Subdepartment'] = SubdepartmentImporter
importers['Vendor'] = VendorImporter
importers['VendorContact'] = VendorContactImporter
importers['Product'] = ProductImporter
importers['ProductFlag'] = ProductFlagImporter
importers['VendorItem'] = VendorItemImporter
importers['Employee'] = EmployeeImporter
importers['CustData'] = CustDataImporter
importers['MemberType'] = MemberTypeImporter
importers['MemberInfo'] = MemberInfoImporter
importers['HouseCoupon'] = HouseCouponImporter
return importers
class FromCoreToCoreImport(FromCoreToCoreBase, FromCoreHandler, ToCoreHandler):
"""
Handler for CORE (other) -> CORE (local) data import.
.. attribute:: direction
Value is ``'import'`` - see also
:attr:`rattail.importing.handlers.ImportHandler.direction`.
"""
dbkey = 'host'
local_title = "CORE (default)"
@property
def host_title(self):
return "CORE ({})".format(self.dbkey)
def make_host_session(self):
return CoreSession(bind=self.config.corepos_engines[self.dbkey])
class FromCoreToCoreExport(FromCoreToCoreBase, FromCoreHandler, ToCoreHandler):
"""
Handler for CORE (local) -> CORE (other) data export.
.. attribute:: direction
Value is ``'export'`` - see also
:attr:`rattail.importing.handlers.ImportHandler.direction`.
"""
direction = 'export'
host_title = "CORE (default)"
@property
def local_title(self):
return "CORE ({})".format(self.dbkey)
def make_session(self):
return CoreSession(bind=self.config.corepos_engines[self.dbkey])
class FromCore(FromSQLAlchemySameToSame):
"""
Base class for CORE -> CORE data importers.
"""
class DepartmentImporter(FromCore, corepos_importing.model.DepartmentImporter):
pass
class SubdepartmentImporter(FromCore, corepos_importing.model.SubdepartmentImporter):
pass
class VendorImporter(FromCore, corepos_importing.model.VendorImporter):
pass
class VendorContactImporter(FromCore, corepos_importing.model.VendorContactImporter):
pass
class ProductImporter(FromCore, corepos_importing.model.ProductImporter):
pass
class ProductFlagImporter(FromCore, corepos_importing.model.ProductFlagImporter):
pass
class VendorItemImporter(FromCore, corepos_importing.model.VendorItemImporter):
pass
class EmployeeImporter(FromCore, corepos_importing.model.EmployeeImporter):
pass
class CustDataImporter(FromCore, corepos_importing.model.CustDataImporter):
pass
class MemberTypeImporter(FromCore, corepos_importing.model.MemberTypeImporter):
pass
class MemberInfoImporter(FromCore, corepos_importing.model.MemberInfoImporter):
pass
class HouseCouponImporter(FromCore, corepos_importing.model.HouseCouponImporter):
pass

View file

@ -0,0 +1,50 @@
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2020 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
CSV -> CORE data import
"""
from corepos.db.office_op import model as corepos, Session as CoreSession
from rattail.importing.handlers import FromFileHandler
from rattail.importing.csv import FromCSVToSQLAlchemyMixin
from rattail_corepos.corepos.importing.db.model import ToCore
from rattail_corepos.corepos.importing.db.corepos import ToCoreHandler
class FromCSVToCore(FromCSVToSQLAlchemyMixin, FromFileHandler, ToCoreHandler):
"""
Handler for CSV -> CORE data import
"""
host_title = "CSV"
ToParent = ToCore
@property
def local_title(self):
return "CORE ({})".format(self.dbkey)
def get_model(self):
return corepos
def make_session(self):
return CoreSession(bind=self.config.corepos_engines[self.dbkey])

View file

@ -0,0 +1,25 @@
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2020 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Exporting data from CORE-POS (direct DB)
"""

View file

@ -0,0 +1,681 @@
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2023 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
CORE-POS -> Catapult Inventory Workbook
"""
import re
import datetime
import decimal
import logging
from collections import OrderedDict
from sqlalchemy.exc import ProgrammingError
from sqlalchemy import orm
from sqlalchemy.orm.exc import NoResultFound
from corepos import enum as corepos_enum
from corepos.db.office_op import model as corepos
from corepos.db.util import table_exists
from rattail.gpc import GPC
from rattail.core import get_uuid
from rattail.importing.handlers import ToFileHandler
from rattail_corepos.corepos.importing.db.corepos import FromCoreHandler, FromCore
from rattail_onager.catapult.importing import inventory as catapult_importing
log = logging.getLogger(__name__)
class FromCoreToCatapult(FromCoreHandler, ToFileHandler):
"""
Handler for CORE -> Catapult (Inventory Workbook)
"""
host_title = "CORE-POS"
local_title = "Catapult (Inventory Workbook)"
direction = 'export'
def get_importers(self):
importers = OrderedDict()
importers['InventoryItem'] = InventoryItemImporter
return importers
class InventoryItemImporter(FromCore, catapult_importing.model.InventoryItemImporter):
"""
Inventory Item data importer.
"""
host_model_class = corepos.Product
# note that we use a "dummy" uuid key here, so logic will consider each row
# to be unique, even when duplicate item_id's are present
key = 'uuid'
supported_fields = [
'uuid',
'item_id',
'dept_id',
'dept_name',
'receipt_alias',
'brand',
'item_name',
'size',
# 'sugg_retail',
'last_cost',
'price_divider',
'base_price',
# 'disc_mult',
'ideal_margin',
'bottle_deposit',
# 'pos_menu_group',
'scale_label',
'sold_by_ea_or_lb',
'quantity_required',
'weight_profile',
'tax_1',
'tax_2',
'spec_tend_1',
'spec_tend_2',
'age_required',
'location',
# 'family_line',
'alt_id',
'alt_receipt_alias',
'alt_pkg_qty',
'alt_pkg_price',
'auto_discount',
'supplier_unit_id',
'supplier_id',
'unit',
'num_pkgs',
# 'cs_pk_multiplier',
# 'dsd',
'pf1',
# 'pf2',
# 'pf3',
# 'pf4',
# 'pf5',
# 'pf6',
# 'pf7',
# 'pf8',
'memo',
'scale_shelf_life',
'scale_shelf_life_type',
'scale_ingredient_text',
]
# we want to add a "duplicate" column at the end
include_duplicate_column = True
# we want to add an "alternate for" column at the end
include_alt_for_column = True
type2_upc_pattern = re.compile(r'^2(\d{5})00000\d')
def setup(self):
super(InventoryItemImporter, self).setup()
# this is used for sorting, when a value has no date
self.old_datetime = datetime.datetime(1900, 1, 1)
self.exclude_invalid_upc = self.config.getbool(
'corepos', 'exporting.catapult_inventory.exclude_invalid_upc',
default=False)
self.warn_invalid_upc = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_invalid_upc',
default=True)
self.ignored_upcs = self.config.getlist(
'corepos', 'exporting.catapult_inventory.ignored_upcs')
self.exclude_missing_department = self.config.getbool(
'corepos', 'exporting.catapult_inventory.exclude_missing_department',
default=False)
self.warn_missing_department = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_missing_department',
default=True)
self.warn_empty_subdepartment = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_empty_subdepartment',
default=True)
self.warn_truncated_receipt_alias = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_truncated_receipt_alias',
default=True)
self.warn_size_null_byte = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_size_null_byte',
default=True)
self.warn_unknown_deposit = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_unknown_deposit',
default=True)
self.warn_scale_label_non_plu = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_scale_label_non_plu',
default=True)
self.warn_scale_label_short_plu = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_scale_label_short_plu',
default=True)
self.warn_weight_profile_non_plu = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_weight_profile_non_plu',
default=True)
self.warn_multiple_vendor_items = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_multiple_vendor_items',
default=True)
self.warn_no_valid_vendor_items = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_no_valid_vendor_items',
default=True)
self.warn_truncated_memo = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_truncated_memo',
default=True)
self.warn_scale_ingredients_newline = self.config.getbool(
'corepos', 'exporting.catapult_inventory.warn_scale_ingredients_newline',
default=True)
self.floor_sections_exist = table_exists(self.host_session,
corepos.FloorSection)
self.tax_components_exist = table_exists(self.host_session,
corepos.TaxRateComponent)
self.tax_rate_ids_1 = self.config.getlist(
'corepos', 'exporting.catapult_inventory.tax_rate_ids_1', default=[])
self.tax_rate_ids_1 = [int(id) for id in self.tax_rate_ids_1]
self.tax_rate_ids_2 = self.config.getlist(
'corepos', 'exporting.catapult_inventory.tax_rate_ids_2', default=[])
self.tax_rate_ids_2 = [int(id) for id in self.tax_rate_ids_2]
# TODO: should add component id levels too?
# tax_component_ids_1 = (1,)
# tax_component_ids_2 = (2,)
self.cache_bottle_deposits()
self.cache_like_codes()
def cache_bottle_deposits(self):
self.deposits = {}
deposits = self.host_session.query(corepos.Product.deposit.distinct())\
.all()
def cache(deposit, i):
assert isinstance(deposit, tuple)
assert len(deposit) == 1
deposit = deposit[0]
if deposit:
deposit = int(deposit)
upc = "{:013d}".format(deposit)
try:
product = self.host_session.query(corepos.Product)\
.filter(corepos.Product.upc == upc)\
.one()
except NoResultFound:
pass # we will log warnings per-item later
else:
self.deposits[deposit] = product
self.progress_loop(cache, deposits,
message="Caching product deposits data")
def cache_like_codes(self):
self.like_codes = {}
mappings = self.host_session.query(corepos.ProductLikeCode)\
.order_by(corepos.ProductLikeCode.like_code_id,
corepos.ProductLikeCode.upc)\
.all()
def cache(mapping, i):
self.like_codes.setdefault(mapping.like_code_id, []).append(mapping)
self.progress_loop(cache, mappings,
message="Caching like codes data")
def query(self):
query = self.host_session.query(corepos.Product)\
.order_by(corepos.Product.upc)\
.options(orm.joinedload(corepos.Product.department))\
.options(orm.joinedload(corepos.Product.subdepartment))\
.options(orm.joinedload(corepos.Product.vendor_items)\
.joinedload(corepos.VendorItem.vendor))\
.options(orm.joinedload(corepos.Product.default_vendor))\
.options(orm.joinedload(corepos.Product.scale_item))\
.options(orm.joinedload(corepos.Product.user_info))\
.options(orm.joinedload(corepos.Product.tax_rate))\
.options(orm.joinedload(corepos.Product._like_code))
if self.floor_sections_exist:
query = query.options(orm.joinedload(corepos.Product.physical_location)\
.joinedload(corepos.ProductPhysicalLocation.floor_section))
return query
def normalize_host_data(self, host_objects=None):
normalized = super(InventoryItemImporter, self).normalize_host_data(host_objects=host_objects)
# re-sort the results by item_id, since e.g. original UPC from CORE may
# have been replaced with a PLU. also put non-numeric first, to bring
# them to user's attention
numeric = []
non_numeric = []
for row in normalized:
if row['item_id'] and row['item_id'].isdigit():
numeric.append(row)
else:
non_numeric.append(row)
numeric.sort(key=lambda row: int(row['item_id']))
non_numeric.sort(key=lambda row: row['item_id'])
normalized = non_numeric + numeric
# now we must check for duplicate item ids, and mark rows accordingly.
# but we *do* want to include/preserve all rows, hence we mark them
# instead of pruning some out. first step is to group all by item_id
items = {}
def collect(row, i):
items.setdefault(row['item_id'], []).append(row)
self.progress_loop(collect, normalized,
message="Grouping rows by Item ID")
# now we go through our groupings and for any item_id with more than 1
# row, we'll mark each row as having a duplicate item_id. note that
# this modifies such a row "in-place" for our overall return value
def inspect(rows, i):
if len(rows) > 1:
for row in rows:
row['__duplicate__'] = True
self.progress_loop(inspect, list(items.values()),
message="Marking any duplicate Item IDs")
# finally, we must inspect the like codes and figure out which
# product(s) should potentially be considered "alternate for" another.
# first step here will be to create mapping of item_id values for each
# CORE product in our result set
item_ids = {}
def mapp(row, i):
product = row['__product__']
item_ids[product.upc] = row['item_id']
self.progress_loop(mapp, normalized,
message="Mapping item_id for CORE products")
# next step here is to check each product and mark "alt for" as needed
def inspect(row, i):
product = row['__product__']
if product.like_code:
others = self.like_codes.get(product.like_code.id)
if others:
first = others[0]
if first.upc != product.upc:
row['__alternate_for__'] = item_ids[first.upc]
self.progress_loop(inspect, normalized,
message="Marking any \"alternate for\" items")
return normalized
def normalize_host_object(self, product):
item_id = product.upc
if self.ignored_upcs and item_id in self.ignored_upcs:
log.debug("ignoring UPC %s for product: %s", product.upc, product)
return
if not item_id:
logger = log.warning if self.warn_invalid_upc else log.debug
logger("product id %s has no upc: %s", product.id, product)
if self.exclude_invalid_upc:
return
if not item_id.isdigit():
logger = log.warning if self.warn_invalid_upc else log.debug
logger("product %s has non-numeric upc: %s",
product.upc, product)
if self.exclude_invalid_upc:
return
# convert item_id either to a PLU, or formatted UPC
is_plu = False
if item_id.isdigit(): # can only convert if it's numeric!
if len(str(int(item_id))) < 6:
is_plu = True
item_id = str(int(item_id))
else: # must add check digit, and re-format
upc = GPC(item_id, calc_check_digit='upc')
item_id = str(upc)
assert len(item_id) == 14
# drop leading zero(s)
if item_id[1] == '0': # UPC-A
item_id = item_id[2:]
assert len(item_id) == 12
else: # EAN13
item_id = item_id[1:]
assert len(item_id) == 13
# figure out the "scale label" data, which may also affect item_id
scale_item = product.scale_item
scale_label = None
if scale_item:
scale_label = 'Y'
if item_id.isdigit():
if len(item_id) < 5:
logger = log.warning if self.warn_scale_label_short_plu else log.debug
logger("product %s has scale label, but PLU is less than 5 digits (%s): %s",
product.upc, item_id, product)
elif len(item_id) > 5:
match = self.type2_upc_pattern.match(item_id)
if match:
# convert type-2 UPC to PLU
is_plu = True
item_id = str(int(match.group(1)))
log.debug("converted type-2 UPC %s to PLU %s for: %s",
product.upc, item_id, product)
else:
logger = log.warning if self.warn_scale_label_non_plu else log.debug
logger("product %s has scale label, but non-PLU item_id: %s",
product.upc, product)
department = product.department
if not department:
logger = log.warning if self.warn_missing_department else log.debug
logger("product %s has no department: %s", product.upc, product)
if self.exclude_missing_department:
return
# size may come from one of two fields, or combination thereof
pack_size = (product.size or '').strip()
uom = (product.unit_of_measure or '').strip()
numeric_pack = False
if pack_size:
try:
decimal.Decimal(pack_size)
except decimal.InvalidOperation:
pass
else:
numeric_pack = True
if numeric_pack:
size = "{} {}".format(pack_size, uom).strip()
else:
size = pack_size or uom or None
# TODO: this logic may actually be client-specific? i just happened to
# find some null chars in a client DB and needed to avoid them, b/c the
# openpyxl lib said IllegalCharacterError
if size is not None and '\x00' in size:
logger = log.warning if self.warn_size_null_byte else log.debug
logger("product %s has null byte in size field: %s",
product.upc, product)
size = size.replace('\x00', '')
price_divider = None
if (product.quantity and product.group_price and
product.price_method == corepos_enum.PRODUCT_PRICE_METHOD_ALWAYS):
diff = (product.quantity * product.normal_price) - product.group_price
if abs(round(diff, 2)) > .01:
log.warning("product %s has multi-price with $%0.2f diff: %s",
product.upc, diff, product)
price_divider = product.quantity
bottle_deposit = None
if product.deposit:
deposit = int(product.deposit)
if deposit in self.deposits:
bottle_deposit = self.deposits[deposit].normal_price
else:
logger = log.warning if self.warn_unknown_deposit else log.debug
logger("product %s has unknown deposit %s which will be ignored: %s",
product.upc, deposit, product)
sold_by_ea_or_lb = None
if is_plu:
sold_by_ea_or_lb = 'LB' if product.scale else 'EA'
weight_profile = None
if product.scale or scale_item:
if not is_plu:
logger = log.warning if self.warn_weight_profile_non_plu else log.debug
logger("product %s has weight profile, but non-PLU item_id %s: %s",
product.upc, item_id, product)
weight_profile = 'LBNT'
# calculate tax rates according to configured "mappings"
tax_1 = 0
tax_2 = 0
if product.tax_rate:
# TODO: need to finish the logic to handle tax components
if self.tax_components_exist and product.tax_rate.components:
# for component in product.tax_rate.components:
# if tax_component_ids_1 and component.id in tax_component_ids_1:
# tax_1 += component.rate
# if tax_component_ids_2 and component.id in tax_component_ids_2:
# tax_2 += component.rate
raise NotImplementedError
else: # no components
rate = product.tax_rate
if self.tax_rate_ids_1 and rate.id in self.tax_rate_ids_1:
tax_1 += rate.rate
if self.tax_rate_ids_2 and rate.id in self.tax_rate_ids_2:
tax_2 += rate.rate
if not (self.tax_rate_ids_1 or self.tax_rate_ids_2) and rate.rate:
log.warning("product %s has unknown tax rate %s (%s) which will "
"be considered as tax 1: %s",
product.upc, rate.rate, rate.description, product)
tax_1 += rate.rate
location = None
if self.floor_sections_exist and product.physical_location and product.physical_location.floor_section:
location = product.physical_location.floor_section.name
if len(location) > 30:
log.warning("product %s has location length %s; will truncate: %s",
product.upc, len(location), location)
location = location[:30]
# no alt item (or auto discount) by default
alt_id = None
alt_receipt_alias = None
alt_pkg_qty = None
alt_pkg_price = None
auto_discount = None
# make an alt item, when main item has pack pricing (e.g. Zevia sodas)
# note that in this case the main item_id and alt_id are the same
if (product.quantity and product.group_price and
product.price_method == corepos_enum.PRODUCT_PRICE_METHOD_FULL_SETS):
alt_id = item_id
suffix = "{}-PK".format(product.quantity)
alt_receipt_alias = "{} {}".format(product.description, suffix)
if len(alt_receipt_alias) > 32:
logger = log.warning if self.warn_truncated_receipt_alias else log.debug
logger("alt receipt alias for %s is %s chars; must truncate: %s",
alt_id, len(alt_receipt_alias), alt_receipt_alias)
overage = len(alt_receipt_alias) - 32
alt_receipt_alias = "{} {}".format(
product.description[:-overage], suffix)
assert len(alt_receipt_alias) == 32
alt_pkg_qty = product.quantity
alt_pkg_price = product.group_price
# we also must declare an "auto discount" to get pack price
auto_discount = "{} @ ${:0.2f}".format(alt_pkg_qty, alt_pkg_price)
# no supplier info by default
supplier_unit_id = None
supplier_id = None
supplier_unit = None
supplier_num_pkgs = None
# maybe add supplier info, for "default" `vendorItems` record. we'll
# have to get a little creative to figure out which is the default
vendor_items = []
# first we try to narrow down according to product's default vendor
if product.default_vendor:
vendor_items = [item for item in product.vendor_items
if item.vendor is product.default_vendor]
# but if that didn't work, just use any "valid" vendorItems
if not vendor_items:
# valid in this context means, not missing vendor
vendor_items = [item for item in product.vendor_items
if item.vendor]
if not vendor_items and product.vendor_items:
logger = log.warning if self.warn_no_valid_vendor_items else log.debug
logger("product %s has %s vendorItems but each is missing (valid) vendor: %s",
product.upc, len(product.vendor_items), product)
if vendor_items:
if len(vendor_items) > 1:
# try to narrow down a bit further, based on valid 'units' amount
valid_items = [item for item in vendor_items
if item.units]
if valid_items:
vendor_items = valid_items
# warn if we still have more than one "obvious" vendor item
if len(vendor_items) > 1:
logger = log.warning if self.warn_multiple_vendor_items else log.debug
logger("product %s has %s vendorItems to pick from: %s",
product.upc, len(vendor_items), product)
# sort the list so most recently modified is first
vendor_items.sort(key=lambda item: item.modified or self.old_datetime,
reverse=True)
# use the "first" vendor item available
item = vendor_items[0]
supplier_unit_id = item.sku
supplier_id = item.vendor.name
supplier_num_pkgs = item.units or 1
if supplier_num_pkgs == 1:
supplier_unit = 'LB' if product.scale else 'EA'
else:
supplier_unit = 'CS'
pf1 = None
if product.subdepartment:
if not product.subdepartment.number:
logger = log.warning if self.warn_empty_subdepartment else log.debug
logger("product %s has 'empty' subdepartment number: %s",
product.upc, product)
else:
pf1 = "{} {}".format(product.subdepartment.number,
product.subdepartment.name)
memo = None
if product.user_info and product.user_info.long_text is not None:
memo = str(product.user_info.long_text)
if memo and len(memo) > 254:
logger = log.warning if self.warn_truncated_memo else log.debug
logger("product %s has memo of length %s; will truncate: %s",
product.upc, len(memo), memo)
memo = memo[:254]
scale_ingredient_text = None
if scale_item:
scale_ingredient_text = scale_item.text
if "\n" in scale_ingredient_text:
logger = log.warning if self.warn_scale_ingredients_newline else log.debug
logger("must remove carriage returns for scale ingredients: %s",
scale_ingredient_text)
scale_ingredient_text = scale_ingredient_text.replace("\n", " ")
return {
'__product__': product,
'__original_item_id__': product.upc,
'uuid': get_uuid(),
'item_id': item_id,
'dept_id': department.number if department else None,
'dept_name': department.name if department else None,
'receipt_alias': product.description,
'brand': product.brand,
'item_name': product.description,
'size': size,
# TODO: does CORE have this?
# 'sugg_retail': None,
'last_cost': product.cost,
'price_divider': price_divider,
'base_price': product.normal_price,
'ideal_margin': department.margin * 100 if department and department.margin else None,
# TODO: does CORE have these?
# 'disc_mult': None,
'bottle_deposit': bottle_deposit,
# TODO: does CORE have this?
# 'pos_menu_group': None,
'scale_label': scale_label,
'sold_by_ea_or_lb': sold_by_ea_or_lb,
'quantity_required': 'Y' if product.quantity_enforced else None,
'weight_profile': weight_profile,
'tax_1': tax_1 or None, # TODO: logic above is unfinished
'tax_2': tax_2 or None, # TODO: logic above is unfinished
'spec_tend_1': 'EBT' if product.foodstamp else None,
'spec_tend_2': 'WIC' if product.wicable else None,
'age_required': product.id_enforced or None,
'location': location,
# TODO: does CORE have these?
# 'family_line': None,
'alt_id': alt_id,
'alt_receipt_alias': alt_receipt_alias,
'alt_pkg_qty': alt_pkg_qty,
'alt_pkg_price': alt_pkg_price,
'auto_discount': auto_discount,
'supplier_unit_id': supplier_unit_id,
'supplier_id': supplier_id,
'unit': supplier_unit,
'num_pkgs': supplier_num_pkgs,
# TODO: does CORE have these?
# 'cs_pk_multiplier': None,
# 'dsd': None,
'pf1': pf1,
# TODO: are these needed?
# 'pf2',
# 'pf3',
# 'pf4',
# 'pf5',
# 'pf6',
# 'pf7',
# 'pf8',
'memo': memo,
'scale_shelf_life': scale_item.shelf_life if scale_item else None,
'scale_shelf_life_type': 0 if scale_item else None,
'scale_ingredient_text': scale_ingredient_text,
}

View file

@ -0,0 +1,187 @@
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2023 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
CORE-POS -> Catapult Membership Workbook
"""
import decimal
import logging
from collections import OrderedDict
from sqlalchemy import orm
from corepos.db.office_op import model as corepos
from rattail.importing.handlers import ToFileHandler
from rattail.time import localtime
from rattail.excel import ExcelReader
from rattail_corepos.corepos.importing.db.corepos import FromCoreHandler, FromCore
from rattail_onager.catapult.importing import membership as catapult_importing
log = logging.getLogger(__name__)
class FromCoreToCatapult(FromCoreHandler, ToFileHandler):
"""
Handler for CORE -> Catapult (Membership Workbook)
"""
host_title = "CORE-POS"
local_title = "Catapult (Membership Workbook)"
direction = 'export'
def get_importers(self):
importers = OrderedDict()
importers['Member'] = MemberImporter
return importers
class MemberImporter(FromCore, catapult_importing.model.MemberImporter):
"""
Member data importer.
"""
host_model_class = corepos.CustData
supported_fields = [
'member_id',
'first_name',
'last_name',
'address_1',
'address_2',
'city',
'state',
'zip_code',
'phone_number',
'email',
'member_notes',
'member_join_date',
'family_affiliation',
'account_number',
'membership_profile_name',
]
def setup(self):
super(MemberImporter, self).setup()
self.warn_truncated_field = self.config.getbool(
'corepos', 'exporting.catapult_membership.warn_truncated_field',
default=True)
self.cache_membership_profiles()
def cache_membership_profiles(self):
self.membership_profiles = {}
sheet_name = self.config.get('catapult', 'membership_profiles_worksheet_name',
default="Membership Profile details")
reader = ExcelReader(self.workbook_template_path, sheet_name=sheet_name)
for profile in reader.read_rows(progress=self.progress):
if profile['Equity Paid in Full Amount']:
profile['Equity Paid in Full Amount'] = decimal.Decimal(profile['Equity Paid in Full Amount'])
self.membership_profiles[profile['Membership Profile Name']] = profile
# also figure out which profile is default
self.default_membership_profile = None
for profile in self.membership_profiles.values():
if profile['Please indicate Default Profile'] == 'X':
self.default_membership_profile = profile
break
if not self.default_membership_profile:
raise RuntimeError("cannot determine default membership profile")
def query(self):
return self.host_session.query(corepos.CustData)\
.order_by(corepos.CustData.card_number,
corepos.CustData.person_number,
corepos.CustData.id)\
.options(orm.joinedload(corepos.CustData.member_type))\
.options(orm.joinedload(corepos.CustData.member_info)\
.joinedload(corepos.MemberInfo.dates))\
.options(orm.joinedload(corepos.CustData.member_info)\
.joinedload(corepos.MemberInfo.notes))
def normalize_host_object(self, custdata):
if custdata.person_number == 1:
family_affiliation = False
elif custdata.person_number > 1:
family_affiliation = True
else:
log.warning("member #%s has unexpected person_number (%s): %s",
custdata.card_number, custdata.person_number, custdata)
family_affiliation = False
if custdata.member_type:
membership_profile_name = custdata.member_type.description
else:
log.warning("member #%s has no member type: %s",
custdata.card_number, custdata)
membership_profile_name = self.default_membership_profile['Membership Profile Name']
data = {
'member_id': str(custdata.id),
'first_name': custdata.first_name,
'last_name': custdata.last_name,
# these will be blank unless we have an associated `meminfo` record
'phone_number': None,
'email': None,
'address_1': None,
'address_2': None,
'city': None,
'state': None,
'zip_code': None,
'member_join_date': None,
'member_notes': None,
'family_affiliation': family_affiliation,
'account_number': str(custdata.card_number),
'membership_profile_name': membership_profile_name,
}
info = custdata.member_info
if info:
data['phone_number'] = info.phone
data['email'] = info.email
data['address_1'], data['address_2'] = info.split_street()
data['city'] = info.city
data['state'] = info.state
data['zip_code'] = info.zip
if info.dates:
if len(info.dates) > 1:
log.warning("member #%s has multiple (%s) `memDates` records: %s",
custdata.card_number, len(info.dates), custdata)
dates = info.dates[0]
if dates.start_date:
start_date = localtime(self.config, dates.start_date).date()
data['member_join_date'] = start_date.strftime('%Y-%m-%d')
if info.notes:
notes = []
for note in reversed(info.notes): # show most recent first
text = str(note.note or '').strip() or None
if text:
notes.append(text)
if notes:
data['member_notes'] = '\n'.join(notes)
return data

View file

@ -0,0 +1,48 @@
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2020 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
CORE-POS Data Export
"""
from corepos.db.office_op import model as corepos
from rattail.importing.handlers import ToCSVHandler
from rattail.importing.exporters import FromSQLAlchemyToCSVMixin
from rattail_corepos.corepos.importing.db.corepos import FromCoreHandler, FromCore
class FromCoreToCSV(FromSQLAlchemyToCSVMixin, FromCoreHandler, ToCSVHandler):
"""
Handler for CORE -> CSV data export.
"""
direction = 'export'
local_title = "CSV"
FromParent = FromCore
ignored_model_names = ['Change'] # omit the datasync change model
@property
def host_title(self):
return self.config.node_title(default="CORE")
def get_model(self):
return corepos

View file

@ -0,0 +1,169 @@
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2020 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
CORE-POS model importers (direct DB)
.. warning::
All classes in this module are "direct DB" importers, which will write
directly to MySQL. They are meant to be used in dry-run mode only, and/or
for sample data import to a dev system etc. They are *NOT* meant for
production use, as they will completely bypass any CORE business rules logic
which may exist.
"""
from sqlalchemy.orm.exc import NoResultFound
from rattail import importing
from corepos.db.office_op import model as corepos
from corepos.db.office_trans import model as coretrans
class ToCore(importing.ToSQLAlchemy):
"""
Base class for all CORE "operational" model importers.
"""
def create_object(self, key, host_data):
# NOTE! some tables in CORE DB may be using the MyISAM storage engine,
# which means it is *not* transaction-safe and therefore we cannot rely
# on "rollback" if in dry-run mode! in other words we better not touch
# the record at all, for dry run
if self.dry_run:
return host_data
return super(ToCore, self).create_object(key, host_data)
def update_object(self, obj, host_data, **kwargs):
# NOTE! some tables in CORE DB may be using the MyISAM storage engine,
# which means it is *not* transaction-safe and therefore we cannot rely
# on "rollback" if in dry-run mode! in other words we better not touch
# the record at all, for dry run
if self.dry_run:
return obj
return super(ToCore, self).update_object(obj, host_data, **kwargs)
def delete_object(self, obj):
# NOTE! some tables in CORE DB may be using the MyISAM storage engine,
# which means it is *not* transaction-safe and therefore we cannot rely
# on "rollback" if in dry-run mode! in other words we better not touch
# the record at all, for dry run
if self.dry_run:
return True
return super(ToCore, self).delete_object(obj)
class ToCoreTrans(importing.ToSQLAlchemy):
"""
Base class for all CORE "transaction" model importers
"""
########################################
# CORE Operational
########################################
class DepartmentImporter(ToCore):
model_class = corepos.Department
key = 'number'
class SubdepartmentImporter(ToCore):
model_class = corepos.Subdepartment
key = 'number'
class VendorImporter(ToCore):
model_class = corepos.Vendor
key = 'id'
class VendorContactImporter(ToCore):
model_class = corepos.VendorContact
key = 'vendor_id'
class ProductImporter(ToCore):
model_class = corepos.Product
key = 'id'
class ProductFlagImporter(ToCore):
model_class = corepos.ProductFlag
key = 'bit_number'
class VendorItemImporter(ToCore):
model_class = corepos.VendorItem
key = ('sku', 'vendor_id')
class EmployeeImporter(ToCore):
model_class = corepos.Employee
key = 'number'
class CustDataImporter(ToCore):
model_class = corepos.CustData
key = 'id'
class MemberTypeImporter(ToCore):
model_class = corepos.MemberType
key = 'id'
class MemberInfoImporter(ToCore):
model_class = corepos.MemberInfo
key = 'card_number'
class MemberDateImporter(ToCore):
model_class = corepos.MemberDate
key = 'card_number'
class MemberContactImporter(ToCore):
model_class = corepos.MemberContact
key = 'card_number'
class HouseCouponImporter(ToCore):
model_class = corepos.HouseCoupon
key = 'coupon_id'
########################################
# CORE Transactions
########################################
class TransactionDetailImporter(ToCoreTrans):
"""
CORE-POS transaction data importer.
"""
model_class = coretrans.TransactionDetail

View file

@ -0,0 +1,198 @@
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2023 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Square -> CORE-POS data importing
"""
import re
import datetime
import decimal
from collections import OrderedDict
import sqlalchemy as sa
from corepos.db.office_trans import Session as CoreTransSession, model as coretrans
from rattail import importing
from rattail_corepos.corepos.office.importing import db as corepos_importing
class FromSquareToCoreTrans(importing.ToSQLAlchemyHandler):
"""
Square -> CORE-POS import handler.
"""
host_title = "Square"
local_title = "CORE-POS"
def make_session(self):
return CoreTransSession()
def get_importers(self):
importers = OrderedDict()
importers['TransactionDetail'] = TransactionDetailImporter
return importers
class FromSquare(importing.FromCSV):
"""
Base class for Square -> CORE-POS importers.
"""
class TransactionDetailImporter(FromSquare, corepos_importing.model.TransactionDetailImporter):
"""
Transaction detail importer.
"""
key = 'store_row_id'
supported_fields = [
'store_row_id',
'date_time',
'card_number',
'upc',
'description',
'quantity',
'unit_price',
'discount',
'tax',
'total',
]
batches_supported = True
def setup(self):
super(TransactionDetailImporter, self).setup()
# cache existing transactions by ID
self.transaction_details = self.cache_model(coretrans.TransactionDetail,
key=self.transaction_detail_key)
# keep track of new IDs
self.new_ids = {}
self.last_new_id = self.get_last_new_id()
def transaction_detail_key(self, detail, normal):
return (
detail.store_id,
detail.register_number,
detail.date_time,
detail.upc,
)
def get_last_new_id(self):
# TODO: pretty sure there is a better way to do this...
return self.session.query(sa.func.max(coretrans.TransactionDetail.store_row_id))\
.scalar() or 0
currency_pattern = re.compile(r'^\$(?P<amount>\d+\.\d\d)$')
currency_pattern_negative = re.compile(r'^\(\$(?P<amount>\d+\.\d\d)\)$')
def parse_currency(self, value):
value = (value or '').strip() or None
if value:
# first check for positive amount
match = self.currency_pattern.match(value)
if match:
return float(match.group('amount'))
# okay then, check for negative amount
match = self.currency_pattern_negative.match(value)
if match:
return 0 - float(match.group('amount'))
def normalize_host_object(self, csvrow):
# date_time
date = datetime.datetime.strptime(csvrow['Date'], '%m/%d/%Y').date()
time = datetime.datetime.strptime(csvrow['Time'], '%H:%M:%S').time()
date_time = datetime.datetime.combine(date, time)
# upc
upc = csvrow['SKU']
# store_row_id
key = (
0, # store_id
None, # register_number
date_time,
upc,
)
if key in self.transaction_details:
store_row_id = self.transaction_details[key].store_row_id
else:
store_row_id = self.last_new_id + 1
self.new_ids[store_row_id] = csvrow
self.last_new_id = store_row_id
# card_number
card_number = csvrow['Customer Reference ID'] or None
if card_number:
card_number = int(card_number)
# description
description = csvrow['Item']
# quantity
quantity = float(csvrow['Qty'])
# unit_price
unit_price = self.parse_currency(csvrow['Gross Sales'])
if unit_price is not None:
unit_price /= quantity
unit_price = decimal.Decimal('{:0.2f}'.format(unit_price))
elif csvrow['Gross Sales']:
log.warning("cannot parse 'unit_price' from: %s", csvrow['Gross Sales'])
# discount
discount = self.parse_currency(csvrow['Discounts'])
if discount is not None:
discount = decimal.Decimal('{:0.2f}'.format(discount))
elif csvrow['Discounts']:
log.warning("cannot parse 'discount' from: %s", csvrow['Discounts'])
# tax
tax = self.parse_currency(csvrow['Tax'])
if csvrow['Tax'] and tax is None:
log.warning("cannot parse 'tax' from: %s", csvrow['Tax'])
tax = bool(tax)
# total
total = self.parse_currency(csvrow['Net Sales'])
if total is not None:
total = decimal.Decimal('{:0.2f}'.format(total))
elif csvrow['Net Sales']:
log.warning("cannot parse 'total' from: %s", csvrow['Net Sales'])
return {
'_object_str': "({}) {}".format(upc, description),
'store_row_id': store_row_id,
'date_time': date_time,
'card_number': card_number,
'upc': upc,
'description': description,
'quantity': quantity,
'unit_price': unit_price,
'discount': discount,
'tax': tax,
'total': total,
}

View file

@ -0,0 +1,471 @@
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2021 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
CORE-POS model importers (webservices API)
"""
from rattail import importing
from rattail.util import data_diffs
from rattail_corepos.corepos.util import get_core_members
from rattail_corepos.corepos.api import make_corepos_api
class ToCoreAPI(importing.Importer):
"""
Base class for all CORE "operational" model importers, which use the API.
"""
# TODO: these importers are in a bit of an experimental state at the
# moment. we only allow create/update b/c it will use the API instead of
# direct DB
allow_delete = False
caches_local_data = True
def setup(self):
self.establish_api()
def establish_api(self):
self.api = make_corepos_api(self.config)
def ensure_fields(self, data):
"""
Ensure each of our supported fields are included in the data. This is
to handle cases where the API does not return all fields, e.g. when
some of them are empty.
"""
for field in self.fields:
if field not in data:
data[field] = None
def fix_empties(self, data, fields):
"""
Fix "empty" values for the given set of fields. This just uses an
empty string instead of ``None`` for each, to add some consistency
where the API might lack it.
Main example so far, is the Vendor API, which may not return some
fields at all (and so our value is ``None``) in some cases, but in
other cases it *will* return a value, default of which is the empty
string. So we want to "pretend" that we get an empty string back even
when we actually get ``None`` from it.
"""
for field in fields:
if data[field] is None:
data[field] = ''
class MemberImporter(ToCoreAPI):
"""
Member model importer for CORE-POS
"""
model_name = 'Member'
key = 'cardNo'
supported_fields = [
'cardNo'
'customerAccountID',
'customers',
# 'memberStatus',
# 'activeStatus',
# 'customerTypeID',
# 'chargeBalance',
# 'chargeLimit',
# 'idCardUPC',
'startDate',
'endDate',
'addressFirstLine',
'addressSecondLine',
'city',
'state',
'zip',
# 'contactAllowed',
# 'contactMethod',
# 'modified',
]
supported_customer_fields = [
'customerID',
# 'customerAccountID',
# 'cardNo',
'firstName',
'lastName',
# 'chargeAllowed',
# 'checksAllowed',
# 'discount',
'accountHolder',
# 'staff',
'phone',
'altPhone',
'email',
# 'memberPricingAllowed',
# 'memberCouponsAllowed',
# 'lowIncomeBenefits',
# 'modified',
]
empty_date_value = '0000-00-00 00:00:00'
def get_local_objects(self, host_data=None):
return get_core_members(self.config, self.api, progress=self.progress)
def get_single_local_object(self, key):
assert len(self.key) == 1
assert self.key[0] == 'cardNo'
return self.api.get_member(key[0])
def normalize_local_object(self, member):
data = dict(member)
return data
def data_diffs(self, local_data, host_data):
diffs = super(MemberImporter, self).data_diffs(local_data, host_data)
# the 'customers' field requires a more granular approach, since the
# data coming from API may have different fields than our local data
if 'customers' in self.fields and 'customers' in diffs:
if not self.customer_data_differs(local_data, host_data):
diffs.remove('customers')
# also the start/end dates should be looked at more closely. if they
# contain the special '__omit__' value then we won't ever count as diff
if 'startDate' in self.fields and 'startDate' in diffs:
if host_data['startDate'] == '__omit__':
diffs.remove('startDate')
if 'endDate' in self.fields and 'endDate' in diffs:
if host_data['endDate'] == '__omit__':
diffs.remove('endDate')
return diffs
def customer_data_differs(self, local_data, host_data):
local_customers = local_data['customers']
host_customers = host_data['customers']
# if both are empty, we're good
if not local_customers and not host_customers:
return False
# obviously we differ if record count doesn't match
if len(local_customers) != len(host_customers):
return True
# okay then, let's traverse the "new" list
for host_customer in host_customers:
# we differ if can't locate corresponding "old" local record
local_customer = self.find_local_customer(local_customers, host_customer)
if not local_customer:
return True
# we differ if old and new records differ
if data_diffs(local_customer, host_customer,
fields=self.supported_customer_fields):
return True
# okay, now let's traverse the "old" list
for local_customer in local_customers:
# we differ if can't locate corresponding "new" host record
host_customer = self.find_host_customer(host_customers, local_customer)
if not host_customer:
return True
# guess we don't differ after all
return False
def find_local_customer(self, local_customers, host_customer):
assert 'customerID' in self.supported_customer_fields
if not host_customer['customerID']:
return # new customer
for local_customer in local_customers:
if local_customer['customerID'] == host_customer['customerID']:
return local_customer
def find_host_customer(self, host_customers, local_customer):
assert 'customerID' in self.supported_customer_fields
for host_customer in host_customers:
if host_customer['customerID'] == local_customer['customerID']:
return host_customer
def create_object(self, key, data):
# we can get away with using the same logic for both here
return self.update_object(None, data)
def update_object(self, member, data, local_data=None):
"""
Push an update for the member, via the CORE API.
"""
if self.dry_run:
return data
cardNo = data.pop('cardNo')
data = dict(data)
if data.get('startDate') == '__omit__':
data.pop('startDate')
if data.get('endDate') == '__omit__':
data.pop('endDate')
member = self.api.set_member(cardNo, **data)
return member
class DepartmentImporter(ToCoreAPI):
"""
Department model importer for CORE-POS
"""
model_name = 'Department'
key = 'dept_no'
supported_fields = [
'dept_no',
'dept_name',
# TODO: should enable some of these fields?
# 'dept_tax',
# 'dept_fs',
# 'dept_limit',
# 'dept_minimum',
# 'dept_discount',
# 'dept_see_id',
# 'modified',
# 'modifiedby',
# 'margin',
# 'salesCode',
# 'memberOnly',
]
def get_local_objects(self, host_data=None):
return self.api.get_departments()
def get_single_local_object(self, key):
assert len(self.key) == 1
assert self.key[0] == 'dept_no'
return self.api.get_department(key[0])
def normalize_local_object(self, department):
data = dict(department)
return data
def create_object(self, key, data):
# we can get away with using the same logic for both here
return self.update_object(None, data)
def update_object(self, department, data, local_data=None):
"""
Push an update for the department, via the CORE API.
"""
if self.dry_run:
return data
dept_no = data.pop('dept_no')
department = self.api.set_department(dept_no, **data)
return department
class SubdepartmentImporter(ToCoreAPI):
"""
Subdepartment model importer for CORE-POS
"""
model_name = 'Subdepartment'
key = 'subdept_no'
supported_fields = [
'subdept_no',
'subdept_name',
'dept_ID',
]
def get_local_objects(self, host_data=None):
return self.api.get_subdepartments()
def get_single_local_object(self, key):
assert len(self.key) == 1
assert self.key[0] == 'subdept_no'
return self.api.get_subdepartment(key[0])
def normalize_local_object(self, subdepartment):
data = dict(subdepartment)
self.ensure_fields(data)
return data
def create_object(self, key, data):
# we can get away with using the same logic for both here
return self.update_object(None, data)
def update_object(self, subdepartment, data, local_data=None):
"""
Push an update for the subdepartment, via the CORE API.
"""
if self.dry_run:
return data
subdept_no = data.pop('subdept_no')
subdepartment = self.api.set_subdepartment(subdept_no, **data)
return subdepartment
class VendorImporter(ToCoreAPI):
"""
Vendor model importer for CORE-POS
"""
model_name = 'Vendor'
key = 'vendorID'
supported_fields = [
'vendorID',
'vendorName',
'vendorAbbreviation',
'shippingMarkup',
'discountRate',
'phone',
'fax',
'email',
'website',
'address',
'city',
'state',
'zip',
'notes',
'localOriginID',
'inactive',
'orderMinimum',
'halfCases',
]
def get_local_objects(self, host_data=None):
return self.api.get_vendors()
def get_single_local_object(self, key):
assert len(self.key) == 1
assert self.key[0] == 'vendorID'
return self.api.get_vendor(key[0])
def normalize_local_object(self, vendor):
data = dict(vendor)
# make sure all fields are present
self.ensure_fields(data)
# fix some "empty" values
self.fix_empties(data, ['phone', 'fax', 'email'])
# convert some values to native type
data['discountRate'] = float(data['discountRate'])
return data
def create_object(self, key, data):
# we can get away with using the same logic for both here
return self.update_object(None, data)
def update_object(self, vendor, data, local_data=None):
"""
Push an update for the vendor, via the CORE API.
"""
if self.dry_run:
return data
vendorID = data.pop('vendorID')
vendor = self.api.set_vendor(vendorID, **data)
return vendor
class ProductImporter(ToCoreAPI):
"""
Product model importer for CORE-POS
"""
model_name = 'Product'
key = 'upc'
supported_fields = [
'upc',
'brand',
'description',
'size',
'department',
'normal_price',
'foodstamp',
'scale',
# 'tax', # TODO!
# TODO: maybe enable some of these fields?
# 'formatted_name',
# 'pricemethod',
# 'groupprice',
# 'quantity',
# 'special_price',
# 'specialpricemethod',
# 'specialgroupprice',
# 'specialquantity',
# 'start_date',
# 'end_date',
# 'scaleprice',
# 'mixmatchcode',
# 'modified',
# 'tareweight',
# 'discount',
# 'discounttype',
# 'line_item_discountable',
# 'unitofmeasure',
# 'wicable',
# 'qttyEnforced',
# 'idEnforced',
# 'cost',
# 'inUse',
# 'numflag',
# 'subdept',
# 'deposit',
# 'local',
# 'store_id',
# 'default_vendor_id',
# 'current_origin_id',
]
def get_local_objects(self, host_data=None):
return self.api.get_products()
def get_single_local_object(self, key):
assert len(self.key) == 1
assert self.key[0] == 'upc'
return self.api.get_product(key[0])
def normalize_local_object(self, product):
data = dict(product)
# make sure all fields are present
self.ensure_fields(data)
# fix some "empty" values
self.fix_empties(data, ['brand'])
return data
def create_object(self, key, data):
# we can get away with using the same logic for both here
return self.update_object(None, data)
def update_object(self, product, data, local_data=None):
"""
Push an update for the product, via the CORE API.
"""
if self.dry_run:
return data
upc = data.pop('upc')
product = self.api.set_product(upc, **data)
return product

View file

@ -0,0 +1,344 @@
# -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2023 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Rattail -> CORE-POS data export
"""
import logging
from collections import OrderedDict
from sqlalchemy import orm
from rattail import importing
from rattail.db import model
from rattail.util import pretty_quantity
from rattail_corepos.corepos import importing as corepos_importing
from rattail_corepos.corepos.util import get_max_existing_vendor_id
log = logging.getLogger(__name__)
class ToCOREAPIHandler(importing.ImportHandler):
"""
Base class for handlers targeting the CORE API.
"""
local_key = 'corepos_api'
generic_local_title = "CORE Office (API)"
@property
def local_title(self):
return "CORE-POS (API)"
class FromRattailToCore(importing.FromRattailHandler, ToCOREAPIHandler):
"""
Rattail -> CORE-POS export handler
"""
direction = 'export'
safe_for_web_app = True
def get_importers(self):
importers = OrderedDict()
importers['Member'] = MemberImporter
importers['Department'] = DepartmentImporter
importers['Subdepartment'] = SubdepartmentImporter
importers['Vendor'] = VendorImporter
importers['Product'] = ProductImporter
return importers
class FromRattail(importing.FromSQLAlchemy):
"""
Base class for Rattail -> CORE-POS exporters.
"""
class MemberImporter(FromRattail, corepos_importing.model.MemberImporter):
"""
Member data exporter
"""
host_model_class = model.Customer
key = 'cardNo'
supported_fields = [
'cardNo',
'customerAccountID',
'customers',
'addressFirstLine',
'addressSecondLine',
'city',
'state',
'zip',
'startDate',
'endDate',
]
supported_customer_fields = [
'customerID',
'firstName',
'lastName',
'accountHolder',
'phone',
'altPhone',
'email',
]
def query(self):
query = super(MemberImporter, self).query()
query = query.options(orm.joinedload(model.Customer.addresses))\
.options(orm.joinedload(model.Customer._people)\
.joinedload(model.CustomerPerson.person)\
.joinedload(model.Person.phones))\
.options(orm.joinedload(model.Customer._people)\
.joinedload(model.CustomerPerson.person)\
.joinedload(model.Person.emails))
return query
def normalize_host_object(self, customer):
address = customer.addresses[0] if customer.addresses else None
people = []
for i, person in enumerate(customer.people, 1):
phones = person.phones
phone1 = phones[0] if phones else None
phone2 = phones[1] if len(phones) > 1 else None
email = person.emails[0] if person.emails else None
people.append({
'customerID': str(person.corepos_customer_id),
'firstName': person.first_name,
'lastName': person.last_name,
'accountHolder': i == 1,
'phone': phone1.number if phone1 else '',
'altPhone': phone2.number if phone2 else '',
'email': email.address if email else '',
})
member = customer.only_member(require=False)
if member:
if member.joined:
start_date = member.joined.strftime('%Y-%m-%d 00:00:00')
else:
start_date = self.empty_date_value
if member.withdrew:
end_date = member.withdrew.strftime('%Y-%m-%d 00:00:00')
else:
end_date = self.empty_date_value
else:
start_date = '__omit__'
end_date = '__omit__'
return {
'cardNo': customer.number,
'customerAccountID': customer.id,
'addressFirstLine': address.street if address else '',
'addressSecondLine': address.street2 if address else '',
'city': address.city if address else '',
'state': address.state if address else '',
'zip': address.zipcode if address else '',
'startDate': start_date,
'endDate': end_date,
'customers': people,
}
class DepartmentImporter(FromRattail, corepos_importing.model.DepartmentImporter):
"""
Department data exporter
"""
host_model_class = model.Department
key = 'dept_no'
supported_fields = [
'dept_no',
'dept_name',
]
def normalize_host_object(self, department):
return {
'dept_no': str(department.number),
'dept_name': department.name,
}
class SubdepartmentImporter(FromRattail, corepos_importing.model.SubdepartmentImporter):
"""
Subdepartment data exporter
"""
host_model_class = model.Subdepartment
key = 'subdept_no'
supported_fields = [
'subdept_no',
'subdept_name',
'dept_ID',
]
def normalize_host_object(self, subdepartment):
department = subdepartment.department
return {
'subdept_no': str(subdepartment.number),
'subdept_name': subdepartment.name,
'dept_ID': str(department.number) if department else None,
}
class VendorImporter(FromRattail, corepos_importing.model.VendorImporter):
"""
Vendor data exporter
"""
host_model_class = model.Vendor
key = 'vendorID'
supported_fields = [
'vendorID',
'vendorName',
'vendorAbbreviation',
'discountRate',
'phone',
'fax',
'email',
]
def setup(self):
super(VendorImporter, self).setup()
# self.max_existing_vendor_id = self.get_max_existing_vendor_id()
self.max_existing_vendor_id = get_max_existing_vendor_id(self.config)
self.last_vendor_id = self.max_existing_vendor_id
def get_next_vendor_id(self):
if hasattr(self, 'last_vendor_id'):
self.last_vendor_id += 1
return self.last_vendor_id
last_vendor_id = get_max_existing_vendor_id(self.config)
return last_vendor_id + 1
def normalize_host_object(self, vendor):
vendor_id = vendor.corepos_id
if not vendor_id:
vendor_id = self.get_next_vendor_id()
data = {
'vendorID': str(vendor_id),
'vendorName': vendor.name,
'vendorAbbreviation': vendor.abbreviation or '',
'discountRate': float(vendor.special_discount or 0),
}
if 'phone' in self.fields:
phones = [phone for phone in vendor.phones
if phone.type == 'Voice']
data['phone'] = phones[0].number if phones else ''
if 'fax' in self.fields:
phones = [phone for phone in vendor.phones
if phone.type == 'Fax']
data['fax'] = phones[0].number if phones else ''
if 'email' in self.fields:
email = vendor.email
data['email'] = email.address if email else ''
# also embed original Rattail vendor object, if we'll be needing to
# update it later with a new CORE ID
if not vendor.corepos_id:
data['_rattail_vendor'] = vendor
return data
def create_object(self, key, data):
# grab vendor object we (maybe) stashed when normalizing
rattail_vendor = data.pop('_rattail_vendor', None)
# do normal create logic
vendor = super(VendorImporter, self).create_object(key, data)
if vendor:
# maybe set the CORE ID for vendor in Rattail
if rattail_vendor:
rattail_vendor.corepos_id = int(vendor['vendorID'])
return vendor
class ProductImporter(FromRattail, corepos_importing.model.ProductImporter):
"""
Product data exporter
"""
host_model_class = model.Product
key = 'upc'
supported_fields = [
'upc',
'brand',
'description',
'size',
'unitofmeasure',
'department',
'normal_price',
'foodstamp',
'scale',
]
def normalize_host_object(self, product):
upc = product.item_id
if not upc and product.upc:
upc = str(product.upc)[:-1]
if not upc:
log.warning("skipping product %s with unknown upc: %s",
product.uuid, product)
return
return {
'_product': product,
'upc': upc,
'brand': product.brand.name if product.brand else '',
'description': product.description or '',
'size': pretty_quantity(product.unit_size),
'unitofmeasure': product.uom_abbreviation,
'department': str(product.department.number) if product.department else None,
'normal_price': '{:0.2f}'.format(product.regular_price.price) if product.regular_price else None,
'foodstamp': '1' if product.food_stampable else '0',
'scale': '1' if product.weighed else '0',
}
def create_object(self, key, data):
# must be sure not to pass the original Product instance, or else the
# API call will try to serialize and submit it
product = data.pop('_product')
corepos_product = super(ProductImporter, self).create_object(key, data)
if corepos_product:
# update our Rattail Product with the CORE ID
if not self.dry_run:
product.corepos_id = int(corepos_product['id'])
return corepos_product
def update_object(self, corepos_product, data, local_data=None):
# must be sure not to pass the original Product instance, or else the
# API call will try to serialize and submit it
product = data.pop('_product', None)
corepos_product = super(ProductImporter, self).update_object(corepos_product, data, local_data)
return corepos_product