# -*- coding: utf-8; -*- ################################################################################ # # Rattail -- Retail Software Framework # Copyright © 2010-2020 Lance Edgar # # This file is part of Rattail. # # Rattail is free software: you can redistribute it and/or modify it under the # terms of the GNU General Public License as published by the Free Software # Foundation, either version 3 of the License, or (at your option) any later # version. # # Rattail is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # Rattail. If not, see . # ################################################################################ """ Views for inventory batches """ from __future__ import unicode_literals, absolute_import import re import decimal import logging import six from rattail import pod from rattail.db import model, api from rattail.db.util import make_full_description from rattail.time import localtime from rattail.gpc import GPC from rattail.util import pretty_quantity import colander from deform import widget as dfwidget from webhelpers2.html import HTML, tags from tailbone import forms, grids from tailbone.views import MasterView from tailbone.views.batch import BatchMasterView log = logging.getLogger(__name__) class InventoryBatchView(BatchMasterView): """ Master view for inventory batches. """ model_class = model.InventoryBatch model_title_plural = "Inventory Batches" default_handler_spec = 'rattail.batch.inventory:InventoryBatchHandler' route_prefix = 'batch.inventory' url_prefix = '/batch/inventory' index_title = "Inventory" rows_creatable = True bulk_deletable = True results_executable = True mobile_creatable = True mobile_rows_creatable = True # set to True for the UI to "prefer" case amounts, as opposed to unit prefer_cases = False labels = { 'mode': "Count Mode", } grid_columns = [ 'id', 'created', 'created_by', 'description', 'mode', 'rowcount', 'total_cost', 'executed', 'executed_by', ] form_fields = [ 'id', 'description', 'notes', 'created', 'created_by', 'handheld_batches', 'mode', 'reason_code', 'total_cost', 'rowcount', 'complete', 'executed', 'executed_by', ] mobile_form_fields = [ 'mode', 'reason_code', 'rowcount', 'complete', 'executed', 'executed_by', ] model_row_class = model.InventoryBatchRow rows_editable = True row_labels = { 'upc': "UPC", 'previous_units_on_hand': "Prev. On Hand", } row_grid_columns = [ 'sequence', 'upc', 'item_id', 'brand_name', 'description', 'size', 'previous_units_on_hand', 'cases', 'units', 'unit_cost', 'total_cost', 'status_code', ] row_form_fields = [ 'sequence', 'upc', 'brand_name', 'description', 'size', 'status_code', 'previous_units_on_hand', 'case_quantity', 'cases', 'units', 'unit_cost', 'total_cost', 'variance', ] def configure_grid(self, g): super(InventoryBatchView, self).configure_grid(g) # mode g.set_enum('mode', self.enum.INVENTORY_MODE) g.filters['mode'].set_value_renderer( grids.filters.EnumValueRenderer(self.enum.INVENTORY_MODE)) # total_cost g.set_type('total_cost', 'currency') def render_mobile_listitem(self, batch, i): return "({}) {} rows - {}, {}".format( batch.id_str, "?" if batch.rowcount is None else batch.rowcount, batch.created_by, localtime(self.request.rattail_config, batch.created, from_utc=True).strftime('%Y-%m-%d')) def mutable_batch(self, batch): return not batch.executed and not batch.complete and batch.mode != self.enum.INVENTORY_MODE_ZERO_ALL def allow_worksheet(self, batch): return self.mutable_batch(batch) def get_available_modes(self): permission_prefix = self.get_permission_prefix() modes = dict(self.enum.INVENTORY_MODE) if not self.request.has_perm('{}.create.replace'.format(permission_prefix)): if hasattr(self.enum, 'INVENTORY_MODE_REPLACE'): modes.pop(self.enum.INVENTORY_MODE_REPLACE, None) if hasattr(self.enum, 'INVENTORY_MODE_REPLACE_ADJUST'): modes.pop(self.enum.INVENTORY_MODE_REPLACE_ADJUST, None) if not self.handler.allow_zero_all or not self.request.has_perm('{}.create.zero'.format(permission_prefix)): if hasattr(self.enum, 'INVENTORY_MODE_ZERO_ALL'): modes.pop(self.enum.INVENTORY_MODE_ZERO_ALL, None) if not self.handler.allow_variance or not self.request.has_perm('{}.create.variance'.format(permission_prefix)): if hasattr(self.enum, 'INVENTORY_MODE_VARIANCE'): modes.pop(self.enum.INVENTORY_MODE_VARIANCE, None) return modes def configure_form(self, f): super(InventoryBatchView, self).configure_form(f) # mode modes = self.get_available_modes() f.set_enum('mode', modes) f.set_label('mode', "Count Mode") if len(modes) == 1: f.set_widget('mode', forms.widgets.ReadonlyWidget()) f.set_default('mode', list(modes)[0]) # total_cost if self.creating: f.remove_field('total_cost') else: f.set_readonly('total_cost') f.set_type('total_cost', 'currency') # handheld_batches if self.creating: f.remove_field('handheld_batches') else: f.set_readonly('handheld_batches') f.set_renderer('handheld_batches', self.render_handheld_batches) # complete if self.creating: f.remove_field('complete') def render_handheld_batches(self, inventory_batch, field): items = [] for handheld in inventory_batch._handhelds: text = handheld.handheld.id_str url = self.request.route_url('batch.handheld.view', uuid=handheld.handheld_uuid) items.append(HTML.tag('li', c=[tags.link_to(text, url)])) return HTML.tag('ul', c=items) def row_editable(self, row): return self.mutable_batch(row.batch) def row_deletable(self, row): return self.mutable_batch(row.batch) def save_edit_row_form(self, form): row = form.model_instance batch = row.batch if batch.total_cost is not None and row.total_cost is not None: batch.total_cost -= row.total_cost return super(InventoryBatchView, self).save_edit_row_form(form) def delete_row(self): row = self.Session.query(model.InventoryBatchRow).get(self.request.matchdict['row_uuid']) if not row: raise self.notfound() batch = row.batch if batch.total_cost is not None and row.total_cost is not None: batch.total_cost -= row.total_cost return super(InventoryBatchView, self).delete_row() def create_row(self): """ Desktop workflow view for adding items to inventory batch. """ batch = self.get_instance() if batch.executed: return self.redirect(self.get_action_url('view', batch)) schema = DesktopForm().bind(session=self.Session()) form = forms.Form(schema=schema, request=self.request) if form.validate(newstyle=True): product = self.Session.query(model.Product).get(form.validated['product']) row = None if self.should_aggregate_products(batch): row = self.find_row_for_product(batch, product) if row: row.cases = form.validated['cases'] row.units = form.validated['units'] self.handler.refresh_row(row) if not row: row = model.InventoryBatchRow() row.product = product row.upc = form.validated['upc'] row.brand_name = form.validated['brand_name'] row.description = form.validated['description'] row.size = form.validated['size'] row.case_quantity = form.validated['case_quantity'] row.cases = form.validated['cases'] row.units = form.validated['units'] self.handler.capture_current_units(row) self.handler.add_row(batch, row) description = make_full_description(form.validated['brand_name'], form.validated['description'], form.validated['size']) self.request.session.flash("{} cases, {} units: {} {}".format( form.validated['cases'] or 0, form.validated['units'] or 0, form.validated['upc'].pretty(), description)) return self.redirect(self.request.current_route_url()) title = self.get_instance_title(batch) return self.render_to_response('desktop_form', { 'batch': batch, 'instance': batch, 'instance_title': title, 'index_title': "{}: {}".format(self.get_model_title(), title), 'index_url': self.get_action_url('view', batch), 'form': form, 'dform': form.make_deform_form(), 'allow_cases': self.allow_cases(batch), 'prefer_cases': self.prefer_cases, }) # TODO: deprecate / remove this def allow_cases(self, batch): return self.handler.allow_cases(batch) # TODO: deprecate / remove this def should_aggregate_products(self, batch): """ Must return a boolean indicating whether rows should be aggregated by product for the given batch. """ return self.handler.should_aggregate_products(batch) def desktop_lookup(self): """ Try to locate a product by UPC, and validate it in the context of current batch, returning some data for client JS. """ batch = self.get_instance() if batch.executed: return { 'error': "Current batch has already been executed", 'redirect': self.get_action_url('view', batch), } entry = self.request.GET.get('upc', '') aggregate = self.should_aggregate_products(batch) type2 = self.find_type2_product(entry) if type2: product, price = type2 else: product = self.find_product(entry) force_unit_item = True # TODO: make configurable? unit_forced = False if force_unit_item and product and product.is_pack_item(): product = product.unit unit_forced = True data = self.product_info(product) if type2: data['type2'] = True if not aggregate: if price is None: data['units'] = 1 else: data['units'] = float((price / product.regular_price.price).quantize(decimal.Decimal('0.01'))) result = {'product': data, 'upc_raw': entry, 'upc': None, 'force_unit_item': unit_forced} if not data: upc = re.sub(r'\D', '', entry.strip()) if upc: upc = GPC(upc) result['upc'] = six.text_type(upc) result['upc_pretty'] = upc.pretty() result['image_url'] = pod.get_image_url(self.rattail_config, upc) if product and aggregate: row = self.find_row_for_product(batch, product) if row: result['already_present_in_batch'] = True result['cases'] = float(row.cases) if row.cases is not None else None result['units'] = float(row.units) if row.units is not None else None return result def find_row_for_product(self, batch, product): rows = self.Session.query(model.InventoryBatchRow)\ .filter(model.InventoryBatchRow.batch == batch)\ .filter(model.InventoryBatchRow.product == product)\ .filter(model.InventoryBatchRow.removed == False)\ .all() if rows: if len(rows) > 1: log.error("inventory batch %s should aggregate products, but has %s rows for: %s", batch.id_str, len(rows), product) return rows[0] def find_product(self, entry): upc = re.sub(r'\D', '', entry.strip()) if upc: # first try to locate existing batch row by UPC match provided = GPC(upc, calc_check_digit=False) checked = GPC(upc, calc_check_digit='upc') product = api.get_product_by_upc(self.Session(), provided) if product: return product product = api.get_product_by_upc(self.Session(), checked) if product: return product # maybe try to locate product by alternate code if self.rattail_config.getbool('tailbone', 'inventory.lookup_by_code', default=False): product = api.get_product_by_code(self.Session(), entry) if product: return product def product_info(self, product): data = {} if product and (not product.deleted or self.request.has_perm('products.view_deleted')): data['uuid'] = product.uuid data['upc'] = six.text_type(product.upc) data['upc_pretty'] = product.upc.pretty() data['full_description'] = product.full_description data['brand_name'] = six.text_type(product.brand or '') data['description'] = product.description data['size'] = product.size data['case_quantity'] = 1 # default data['cost_found'] = False data['image_url'] = pod.get_image_url(self.rattail_config, product.upc) return data def configure_mobile_form(self, f): super(InventoryBatchView, self).configure_mobile_form(f) batch = f.model_instance # mode modes = self.get_available_modes() f.set_enum('mode', modes) mode_values = [(k, v) for k, v in sorted(modes.items())] f.set_widget('mode', forms.widgets.PlainSelectWidget(values=mode_values)) # complete if self.creating or batch.executed or not batch.complete: f.remove_field('complete') # rowcount if self.viewing and not batch.executed and not batch.complete: f.remove_field('rowcount') # TODO: this view can create new rows, with only a GET query. that should # probably be changed to require POST; for now we just require the "create # batch row" perm and call it good.. def mobile_row_from_upc(self): """ Locate and/or create a row within the batch, according to the given product UPC, then redirect to the row view page. """ batch = self.get_instance() row = None raw_entry = self.request.GET.get('upc', '') entry = raw_entry.strip() entry = re.sub(r'\D', '', entry) if entry: if len(entry) <= 14: row = self.add_row_for_upc(batch, entry, warn_if_present=True) if not row: self.request.session.flash("Product not found: {}".format(entry), 'error') return self.redirect(self.get_action_url('view', batch, mobile=True)) else: self.request.session.flash("UPC has too many digits ({}): {}".format(len(entry), entry), 'error') return self.redirect(self.get_action_url('view', batch, mobile=True)) else: self.request.session.flash("Product not found: {}".format(raw_entry), 'error') return self.redirect(self.get_action_url('view', batch, mobile=True)) self.Session.flush() return self.redirect(self.mobile_row_route_url('view', uuid=row.batch_uuid, row_uuid=row.uuid)) def add_row_for_upc(self, batch, entry, warn_if_present=False): """ Add a row to the batch for the given UPC, if applicable. """ type2 = self.find_type2_product(entry) if type2: product, price = type2 else: product = self.find_product(entry) if product: force_unit_item = self.rattail_config.getbool( 'tailbone', 'inventory.force_unit_item', default=False) if force_unit_item and product.is_pack_item(): product = product.unit self.request.session.flash("You scanned a pack item, but must count the units instead.", 'error') aggregate = self.should_aggregate_products(batch) if aggregate: row = self.find_row_for_product(batch, product) if row: if warn_if_present: self.request.session.flash("Product already exists in batch; please confirm counts", 'error') return row row = model.InventoryBatchRow() row.product = product row.upc = product.upc self.handler.capture_current_units(row) if type2 and not aggregate: if price is None: row.units = 1 else: row.units = (price / product.regular_price.price).quantize(decimal.Decimal('0.01')) self.handler.add_row(batch, row) return row elif self.handler.unknown_product_creates_row: row = model.InventoryBatchRow() row.upc = GPC(upc, calc_check_digit=False) # TODO: why not calc check digit? row.description = "(unknown product)" self.handler.capture_current_units(row) self.handler.add_row(batch, row) return row def template_kwargs_view_row(self, **kwargs): row = kwargs['instance'] kwargs['product_image_url'] = pod.get_image_url(self.rattail_config, row.upc) return kwargs def get_batch_kwargs(self, batch, mobile=False): kwargs = super(InventoryBatchView, self).get_batch_kwargs(batch, mobile=False) kwargs['mode'] = batch.mode kwargs['complete'] = False kwargs['reason_code'] = batch.reason_code return kwargs def get_mobile_row_data(self, batch): # we want newest on top, for inventory batch rows return self.get_row_data(batch)\ .order_by(self.model_row_class.sequence.desc()) # TODO: ugh, the hackiness. needs a refactor fo sho def mobile_view_row(self): """ Mobile view for inventory batch rows. Note that this also handles updating a row...ugh. """ self.viewing = True row = self.get_row_instance() batch = self.get_parent(row) form = self.make_mobile_row_form(row) allow_cases = self.allow_cases(batch) unit_uom = 'LB' if row.product and row.product.weighed else 'EA' if row.cases and allow_cases: uom = 'CS' elif row.units: uom = unit_uom elif row.case_quantity and allow_cases and self.prefer_cases: uom = 'CS' else: uom = unit_uom context = { 'row': row, 'batch': batch, 'instance': row, 'instance_title': self.get_row_instance_title(row), 'parent_model_title': self.get_model_title(), 'parent_title': self.get_instance_title(batch), 'parent_url': self.get_action_url('view', batch, mobile=True), 'product_image_url': pod.get_image_url(self.rattail_config, row.upc), 'form': form, 'allow_cases': allow_cases, 'unit_uom': unit_uom, 'uom': uom, } if self.request.has_perm('{}.edit_row'.format(self.get_permission_prefix())): schema = InventoryForm().bind(session=self.Session()) update_form = forms.Form(schema=schema, request=self.request) if update_form.validate(newstyle=True): row = self.Session.query(model.InventoryBatchRow).get(update_form.validated['row']) cases = update_form.validated['cases'] units = update_form.validated['units'] if cases is not colander.null: row.cases = cases row.units = None elif units is not colander.null: row.cases = None row.units = units else: raise NotImplementedError self.handler.refresh_row(row) route_prefix = self.get_route_prefix() return self.redirect(self.request.route_url('mobile.{}.view'.format(route_prefix), uuid=batch.uuid)) return self.render_to_response('view_row', context, mobile=True) def get_row_instance_title(self, row): if row.upc: return row.upc.pretty() if row.item_id: return row.item_id return "row {}".format(row.sequence) def configure_row_grid(self, g): super(InventoryBatchView, self).configure_row_grid(g) # quantity fields g.set_type('previous_units_on_hand', 'quantity') g.set_type('cases', 'quantity') g.set_type('units', 'quantity') # currency fields g.set_type('unit_cost', 'currency') g.set_type('total_cost', 'currency') # short labels g.set_label('brand_name', "Brand") g.set_label('status_code', "Status") # links g.set_link('upc') g.set_link('item_id') g.set_link('description') def row_grid_extra_class(self, row, i): if row.status_code == row.STATUS_PRODUCT_NOT_FOUND: return 'warning' def render_mobile_row_listitem(self, row, i): description = row.product.full_description if row.product else row.description unit_uom = 'LB' if row.product and row.product.weighed else 'EA' qty = "{} {}".format(pretty_quantity(row.cases or row.units), 'CS' if row.cases else unit_uom) return "({}) {} - {}".format(row.upc.pretty(), description, qty) def configure_row_form(self, f): super(InventoryBatchView, self).configure_row_form(f) row = f.model_instance # readonly fields f.set_readonly('upc') f.set_readonly('item_id') f.set_readonly('brand_name') f.set_readonly('description') f.set_readonly('size') f.set_readonly('previous_units_on_hand') f.set_readonly('case_quantity') f.set_readonly('variance') f.set_readonly('total_cost') # quantity fields f.set_type('case_quantity', 'quantity') f.set_type('previous_units_on_hand', 'quantity') f.set_type('cases', 'quantity') f.set_type('units', 'quantity') f.set_type('variance', 'quantity') # currency fields f.set_type('unit_cost', 'currency') f.set_type('total_cost', 'currency') # upc f.set_renderer('upc', self.render_upc) # cases if self.editing: if not self.allow_cases(row.batch): f.set_readonly('cases') def render_upc(self, row, field): upc = row.upc if not upc: return "" text = upc.pretty() if row.product_uuid: url = self.request.route_url('products.view', uuid=row.product_uuid) return tags.link_to(text, url) return text @classmethod def defaults(cls, config): cls._batch_defaults(config) cls._defaults(config) cls._inventory_defaults(config) @classmethod def _inventory_defaults(cls, config): rattail_config = config.registry.settings['rattail_config'] model_key = cls.get_model_key() model_title = cls.get_model_title() route_prefix = cls.get_route_prefix() url_prefix = cls.get_url_prefix() permission_prefix = cls.get_permission_prefix() # we need batch handler to determine available permissions factory = cls.get_handler_factory(rattail_config) handler = factory(rattail_config) # extra perms for creating batches per "mode" config.add_tailbone_permission(permission_prefix, '{}.create.replace'.format(permission_prefix), "Create new {} with 'replace' mode".format(model_title)) if handler.allow_zero_all: config.add_tailbone_permission(permission_prefix, '{}.create.zero'.format(permission_prefix), "Create new {} with 'zero' mode".format(model_title)) if handler.allow_variance: config.add_tailbone_permission(permission_prefix, '{}.create.variance'.format(permission_prefix), "Create new {} with 'variance' mode".format(model_title)) # row UPC lookup, for desktop config.add_route('{}.desktop_lookup'.format(route_prefix), '{}/{{{}}}/desktop-form/lookup'.format(url_prefix, model_key)) config.add_view(cls, attr='desktop_lookup', route_name='{}.desktop_lookup'.format(route_prefix), renderer='json', permission='{}.create_row'.format(permission_prefix)) # mobile - make new row from UPC config.add_route('mobile.{}.row_from_upc'.format(route_prefix), '/mobile{}/{{{}}}/row-from-upc'.format(url_prefix, model_key)) config.add_view(cls, attr='mobile_row_from_upc', route_name='mobile.{}.row_from_upc'.format(route_prefix), permission='{}.create_row'.format(permission_prefix)) # TODO: this is a stopgap measure to fix an obvious bug, which exists when the # session is not provided by the view at runtime (i.e. when it was instead # being provided by the type instance, which was created upon app startup). @colander.deferred def valid_inventory_batch_row(node, kw): session = kw['session'] def validate(node, value): row = session.query(model.InventoryBatchRow).get(value) if not row: raise colander.Invalid(node, "Batch row not found") if row.batch.executed: raise colander.Invalid(node, "Batch has already been executed") return row.uuid return validate class InventoryForm(colander.MappingSchema): row = colander.SchemaNode(colander.String(), validator=valid_inventory_batch_row) cases = colander.SchemaNode(colander.Decimal(), missing=colander.null) units = colander.SchemaNode(colander.Decimal(), missing=colander.null) # TODO: this is a stopgap measure to fix an obvious bug, which exists when the # session is not provided by the view at runtime (i.e. when it was instead # being provided by the type instance, which was created upon app startup). @colander.deferred def valid_product(node, kw): session = kw['session'] def validate(node, value): product = session.query(model.Product).get(value) if not product: raise colander.Invalid(node, "Product not found") return product.uuid return validate class DesktopForm(colander.Schema): product = colander.SchemaNode(colander.String(), validator=valid_product) upc = colander.SchemaNode(forms.types.GPCType()) brand_name = colander.SchemaNode(colander.String()) description = colander.SchemaNode(colander.String()) size = colander.SchemaNode(colander.String(), missing=colander.null) case_quantity = colander.SchemaNode(colander.Decimal()) cases = colander.SchemaNode(colander.Decimal(), missing=None) units = colander.SchemaNode(colander.Decimal(), missing=None) def includeme(config): InventoryBatchView.defaults(config)