Move "quick entry" logic for purchase batch, into rattail handler

This commit is contained in:
Lance Edgar 2019-11-13 14:03:54 -06:00
parent a096ce565e
commit d42c2fabb9

View file

@ -34,9 +34,7 @@ import humanize
import sqlalchemy as sa import sqlalchemy as sa
from rattail import pod from rattail import pod
from rattail.db import model, api, Session as RattailSession from rattail.db import model, Session as RattailSession
from rattail.db.util import maxlen
from rattail.gpc import GPC
from rattail.time import localtime, make_utc from rattail.time import localtime, make_utc
from rattail.util import pretty_quantity, prettify, OrderedDict, simple_error from rattail.util import pretty_quantity, prettify, OrderedDict, simple_error
from rattail.vendors.invoices import iter_invoice_parsers, require_invoice_parser from rattail.vendors.invoices import iter_invoice_parsers, require_invoice_parser
@ -1398,153 +1396,12 @@ class ReceivingBatchView(PurchasingBatchView):
return kwargs return kwargs
def should_aggregate_products(self, batch):
"""
Must return a boolean indicating whether rows should be aggregated by
product for the given batch.
"""
return True
def quick_locate_rows(self, batch, entry, product):
rows = []
# try to locate rows by product uuid match before other key
if product:
rows = [row for row in batch.active_rows()
if row.product_uuid == product.uuid]
if rows:
return rows
key = self.rattail_config.product_key()
if key == 'upc':
if entry.isdigit():
# we prefer "exact" UPC matches, i.e. those which assumed the entry
# already contained the check digit.
provided = GPC(entry, calc_check_digit=False)
rows = [row for row in batch.active_rows()
if row.upc == provided]
if rows:
return rows
# if no "exact" UPC matches, we'll settle for those (UPC matches)
# which assume the entry lacked a check digit.
checked = GPC(entry, calc_check_digit='upc')
rows = [row for row in batch.active_rows()
if row.upc == checked]
return rows
elif key == 'item_id':
rows = [row for row in batch.active_rows()
if row.item_id == entry]
return rows
def quick_locate_product(self, batch, entry):
# first let the handler attempt lookup on product key (only)
product = self.handler.locate_product_for_entry(self.Session(), entry,
lookup_by_code=False)
if product:
return product
# now we'll attempt lookup by vendor item code
product = api.get_product_by_vendor_code(self.Session(), entry, vendor=batch.vendor)
if product:
return product
# okay then, let's attempt lookup by "alternate" code
product = api.get_product_by_code(self.Session(), entry)
if product:
return product
def save_quick_row_form(self, form): def save_quick_row_form(self, form):
batch = self.get_instance() batch = self.get_instance()
entry = form.validated['quick_entry'] entry = form.validated['quick_entry']
row = self.handler.quick_entry(self.Session(), batch, entry)
# first try to locate the product based on quick entry
product = self.quick_locate_product(batch, entry)
# then try to locate existing row(s) which match product/entry
rows = self.quick_locate_rows(batch, entry, product)
if rows:
# if aggregating, just re-use matching row
prefer_existing = self.should_aggregate_products(batch)
if prefer_existing:
if len(rows) > 1:
log.warning("found multiple row matches for '%s' in batch %s: %s",
entry, batch.id_str, batch)
return rows[0]
else: # borrow product from matching row, but make new row
other_row = rows[0]
row = model.PurchaseBatchRow()
row.item_entry = entry
row.product = other_row.product
self.handler.add_row(batch, row)
self.Session.flush()
self.handler.refresh_batch_status(batch)
return row return row
# matching row(s) not found; add new row if product was identified
# TODO: probably should be smarter about how we handle deleted?
if product and not product.deleted:
row = model.PurchaseBatchRow()
row.item_entry = entry
row.product = product
self.handler.add_row(batch, row)
self.Session.flush()
self.handler.refresh_batch_status(batch)
return row
key = self.rattail_config.product_key()
if key == 'upc':
# check for "bad" upc
if len(entry) > 14:
return
if not entry.isdigit():
return
provided = GPC(entry, calc_check_digit=False)
checked = GPC(entry, calc_check_digit='upc')
# product not in system, but presumably sane upc, so add to batch anyway
row = model.PurchaseBatchRow()
row.item_entry = entry
add_check_digit = True # TODO: make this dynamic, of course
if add_check_digit:
row.upc = checked
else:
row.upc = provided
row.item_id = entry
row.description = "(unknown product)"
self.handler.add_row(batch, row)
self.Session.flush()
self.handler.refresh_batch_status(batch)
return row
elif key == 'item_id':
# check for "too long" item_id
if len(entry) > maxlen(model.PurchaseBatchRow.item_id):
return
# product not in system, but presumably sane item_id, so add to batch anyway
row = model.PurchaseBatchRow()
row.item_entry = entry
row.item_id = entry
row.description = "(unknown product)"
self.handler.add_row(batch, row)
self.Session.flush()
self.handler.refresh_batch_status(batch)
return row
else:
raise NotImplementedError("don't know how to handle product key: {}".format(key))
def redirect_after_quick_row(self, row, mobile=False): def redirect_after_quick_row(self, row, mobile=False):
if mobile: if mobile:
return self.redirect(self.get_row_action_url('receive', row, mobile=mobile)) return self.redirect(self.get_row_action_url('receive', row, mobile=mobile))