wuttafarm/src/wuttafarm/importing/farmos.py

1746 lines
55 KiB
Python

# -*- coding: utf-8; -*-
################################################################################
#
# WuttaFarm --Web app to integrate with and extend farmOS
# Copyright © 2026 Lance Edgar
#
# This file is part of WuttaFarm.
#
# WuttaFarm is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# WuttaFarm is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# WuttaFarm. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Data import for farmOS -> WuttaFarm
"""
import datetime
import logging
from uuid import UUID
from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
from wuttasync.importing import ImportHandler, ToWuttaHandler, Importer, ToWutta
from wuttafarm.db import model
log = logging.getLogger(__name__)
class FromFarmOSHandler(ImportHandler):
"""
Base class for import handler using farmOS API as data source.
"""
source_key = "farmos"
generic_source_title = "farmOS"
def begin_source_transaction(self, client=None):
"""
Establish the farmOS API client.
"""
if client:
self.farmos_client = client
else:
token = self.get_farmos_oauth2_token()
self.farmos_client = self.app.get_farmos_client(token=token)
self.farmos_4x = self.app.is_farmos_4x(self.farmos_client)
self.normal = self.app.get_normalizer(self.farmos_client)
def get_farmos_oauth2_token(self):
client_id = self.config.get(
"farmos.oauth2.importing.client_id", default="wuttafarm"
)
client_secret = self.config.require("farmos.oauth2.importing.client_secret")
scope = self.config.get("farmos.oauth2.importing.scope", default="farm_manager")
client = BackendApplicationClient(client_id=client_id)
oauth = OAuth2Session(client=client)
return oauth.fetch_token(
token_url=self.app.get_farmos_url("/oauth/token"),
include_client_id=True,
client_secret=client_secret,
scope=scope,
)
def get_importer_kwargs(self, key, **kwargs):
kwargs = super().get_importer_kwargs(key, **kwargs)
kwargs["farmos_client"] = self.farmos_client
kwargs["farmos_4x"] = self.farmos_4x
kwargs["normal"] = self.normal
return kwargs
class ToWuttaFarmHandler(ToWuttaHandler):
"""
Base class for import handler targeting WuttaFarm
"""
target_key = "wuttafarm"
class FromFarmOSToWuttaFarm(FromFarmOSHandler, ToWuttaFarmHandler):
"""
Handler for farmOS → WuttaFarm import.
"""
def define_importers(self):
""" """
importers = super().define_importers()
importers["User"] = UserImporter
importers["AssetType"] = AssetTypeImporter
importers["LandType"] = LandTypeImporter
importers["LandAsset"] = LandAssetImporter
importers["StructureType"] = StructureTypeImporter
importers["StructureAsset"] = StructureAssetImporter
importers["WaterAsset"] = WaterAssetImporter
importers["CompostAsset"] = CompostAssetImporter
importers["EquipmentType"] = EquipmentTypeImporter
importers["EquipmentAsset"] = EquipmentAssetImporter
importers["AnimalType"] = AnimalTypeImporter
importers["AnimalAsset"] = AnimalAssetImporter
importers["GroupAsset"] = GroupAssetImporter
importers["PlantType"] = PlantTypeImporter
importers["Season"] = SeasonImporter
importers["PlantAsset"] = PlantAssetImporter
importers["Measure"] = MeasureImporter
importers["Unit"] = UnitImporter
importers["MaterialType"] = MaterialTypeImporter
importers["QuantityType"] = QuantityTypeImporter
importers["StandardQuantity"] = StandardQuantityImporter
importers["MaterialQuantity"] = MaterialQuantityImporter
importers["LogType"] = LogTypeImporter
importers["ActivityLog"] = ActivityLogImporter
importers["HarvestLog"] = HarvestLogImporter
importers["MedicalLog"] = MedicalLogImporter
importers["ObservationLog"] = ObservationLogImporter
importers["SeedingLog"] = SeedingLogImporter
return importers
class FromFarmOS(Importer):
"""
Base class for importers using farmOS API as data source.
"""
key = "farmos_uuid"
def get_supported_fields(self):
"""
Auto-remove the ``uuid`` field, since we use ``farmos_uuid``
instead for the importer key.
"""
fields = list(super().get_supported_fields())
if "uuid" in fields:
fields.remove("uuid")
return fields
def normalize_datetime(self, dt):
"""
Convert a farmOS datetime value to naive UTC used by
WuttaFarm.
:param dt: Date/time string value "as-is" from the farmOS API.
:returns: Equivalent naive UTC ``datetime``
"""
if not dt:
return None
dt = datetime.datetime.fromisoformat(dt)
return self.app.make_utc(dt)
class AssetImporterBase(FromFarmOS, ToWutta):
"""
Base class for farmOS API → WuttaFarm asset importers
"""
def get_farmos_asset_type(self):
return self.model_class.__wutta_hint__["farmos_asset_type"]
def get_simple_fields(self):
""" """
fields = list(super().get_simple_fields())
# nb. must explicitly declare proxy fields
fields.extend(
[
"farmos_uuid",
"drupal_id",
"asset_type",
"asset_name",
"is_location",
"is_fixed",
"notes",
"archived",
"image_url",
"thumbnail_url",
]
)
return fields
def get_supported_fields(self):
""" """
fields = list(super().get_supported_fields())
fields.extend(
[
"parents",
"owners",
]
)
return fields
def get_source_objects(self):
""" """
asset_type = self.get_farmos_asset_type()
return list(
self.farmos_client.asset.iterate(asset_type, params={"include": "image"})
)
def normalize_source_data(self, **kwargs):
""" """
data = super().normalize_source_data(**kwargs)
if "parents" in self.fields:
# nb. make sure parent-less (root) assets come first, so they
# exist when child assets need to reference them
data.sort(key=lambda l: len(l["parents"]))
return data
def normalize_source_object(self, asset):
""" """
data = self.normal.normalize_farmos_asset(asset)
data["farmos_uuid"] = UUID(data.pop("uuid"))
data["asset_type"] = self.get_asset_type(asset)
if "image_url" in self.fields or "thumbnail_url" in self.fields:
data["image_url"] = None
data["thumbnail_url"] = None
if relationships := asset.get("relationships"):
if image := relationships.get("image"):
if image["data"]:
image = self.farmos_client.resource.get_id(
"file", "file", image["data"][0]["id"]
)
if image_style := image["data"]["attributes"].get(
"image_style_uri"
):
data["image_url"] = image_style["large"]
data["thumbnail_url"] = image_style["thumbnail"]
if "parents" in self.fields:
data["parents"] = []
for parent in asset["relationships"]["parent"]["data"]:
data["parents"].append(
(self.get_asset_type(parent), UUID(parent["id"]))
)
if "owners" in self.fields:
data["owners"] = [UUID(uuid) for uuid in data["owner_uuids"]]
return data
def get_asset_type(self, asset):
return asset["type"].split("--")[1]
def normalize_target_object(self, asset):
data = super().normalize_target_object(asset)
if "parents" in self.fields:
data["parents"] = [(p.asset_type, p.farmos_uuid) for p in asset.parents]
if "owners" in self.fields:
data["owners"] = [user.farmos_uuid for user in asset.owners]
return data
def update_target_object(self, asset, source_data, target_data=None):
model = self.app.model
asset = super().update_target_object(asset, source_data, target_data)
if "parents" in self.fields:
if not target_data or target_data["parents"] != source_data["parents"]:
for key in source_data["parents"]:
asset_type, farmos_uuid = key
if not target_data or key not in target_data["parents"]:
self.target_session.flush()
parent = (
self.target_session.query(model.Asset)
.filter(model.Asset.asset_type == asset_type)
.filter(model.Asset.farmos_uuid == farmos_uuid)
.one()
)
asset.asset._parents.append(model.AssetParent(parent=parent))
if target_data:
for key in target_data["parents"]:
asset_type, farmos_uuid = key
if key not in source_data["parents"]:
parent = (
self.target_session.query(model.Asset)
.filter(model.Asset.asset_type == asset_type)
.filter(model.Asset.farmos_uuid == farmos_uuid)
.one()
)
parent = (
self.target_session.query(model.AssetParent)
.filter(model.AssetParent.asset == asset)
.filter(model.AssetParent.parent == parent)
.one()
)
self.target_session.delete(parent)
if "owners" in self.fields:
if not target_data or target_data["owners"] != source_data["owners"]:
for farmos_uuid in source_data["owners"]:
if not target_data or farmos_uuid not in target_data["owners"]:
user = (
self.target_session.query(model.User)
.join(model.WuttaFarmUser)
.filter(model.WuttaFarmUser.farmos_uuid == farmos_uuid)
.one()
)
asset.owners.append(user)
if target_data:
for farmos_uuid in target_data["owners"]:
if farmos_uuid not in source_data["owners"]:
user = (
self.target_session.query(model.User)
.join(model.WuttaFarmUser)
.filter(model.WuttaFarmUser.farmos_uuid == farmos_uuid)
.one()
)
asset.owners.remove(user)
return asset
class AnimalAssetImporter(AssetImporterBase):
"""
farmOS API → WuttaFarm importer for Animals
"""
model_class = model.AnimalAsset
animal_types_by_farmos_uuid = None
def get_supported_fields(self):
fields = list(super().get_supported_fields())
fields.extend(
[
"animal_type_uuid",
"sex",
"is_sterile",
"produces_eggs",
"birthdate",
]
)
return fields
def setup(self):
super().setup()
model = self.app.model
self.animal_types_by_farmos_uuid = {}
for animal_type in self.target_session.query(model.AnimalType):
if animal_type.farmos_uuid:
self.animal_types_by_farmos_uuid[animal_type.farmos_uuid] = animal_type
def get_animal_type_by_farmos_uuid(self, uuid):
if self.animal_types_by_farmos_uuid is not None:
return self.animal_types_by_farmos_uuid.get(uuid)
model = self.app.model
return (
self.target_session.query(model.AnimalType)
.filter(model.AnimalType.farmos_uuid == uuid)
.first()
)
def normalize_source_object(self, animal):
""" """
animal_type_uuid = None
if relationships := animal.get("relationships"):
if animal_type := relationships.get("animal_type"):
if animal_type["data"]:
if wf_animal_type := self.get_animal_type_by_farmos_uuid(
UUID(animal_type["data"]["id"])
):
animal_type_uuid = wf_animal_type.uuid
else:
log.warning(
"animal type not found: %s", animal_type["data"]["id"]
)
if not animal_type_uuid:
log.warning("missing/invalid animal_type for farmOS Animal: %s", animal)
return None
birthdate = animal["attributes"]["birthdate"]
if birthdate:
birthdate = datetime.datetime.fromisoformat(birthdate)
birthdate = self.app.localtime(birthdate)
birthdate = self.app.make_utc(birthdate)
if self.farmos_4x:
sterile = animal["attributes"]["is_sterile"]
else:
sterile = animal["attributes"]["is_castrated"]
data = super().normalize_source_object(animal)
data.update(
{
"animal_type_uuid": animal_type_uuid,
"sex": animal["attributes"]["sex"],
"is_sterile": sterile,
"produces_eggs": animal["attributes"]["produces_eggs"],
"birthdate": birthdate,
}
)
return data
class TaxonomyImporterBase(FromFarmOS, ToWutta):
"""
farmOS API → WuttaFarm importer for taxonomy terms
"""
taxonomy_type = None
supported_fields = [
"farmos_uuid",
"drupal_id",
"name",
"description",
]
def get_source_objects(self):
""" """
return list(
self.farmos_client.resource.iterate("taxonomy_term", self.taxonomy_type)
)
def normalize_source_object(self, term):
""" """
if description := term["attributes"]["description"]:
description = description["value"]
return {
"farmos_uuid": UUID(term["id"]),
"drupal_id": term["attributes"]["drupal_internal__tid"],
"name": term["attributes"]["name"],
"description": description,
}
class AnimalTypeImporter(TaxonomyImporterBase):
"""
farmOS API → WuttaFarm importer for Animal Types
"""
model_class = model.AnimalType
taxonomy_type = "animal_type"
class MaterialTypeImporter(TaxonomyImporterBase):
"""
farmOS API → WuttaFarm importer for Material Types
"""
model_class = model.MaterialType
taxonomy_type = "material_type"
class EquipmentTypeImporter(TaxonomyImporterBase):
"""
farmOS API → WuttaFarm importer for Equipment Types
"""
model_class = model.EquipmentType
taxonomy_type = "equipment_type"
class AssetTypeImporter(FromFarmOS, ToWutta):
"""
farmOS API → WuttaFarm importer for Asset Types
"""
model_class = model.AssetType
supported_fields = [
"farmos_uuid",
"drupal_id",
"name",
"description",
]
def get_source_objects(self):
""" """
return list(self.farmos_client.resource.iterate("asset_type"))
def normalize_source_object(self, asset_type):
""" """
return {
"farmos_uuid": UUID(asset_type["id"]),
"drupal_id": asset_type["attributes"]["drupal_internal__id"],
"name": asset_type["attributes"]["label"],
"description": asset_type["attributes"]["description"],
}
class EquipmentAssetImporter(AssetImporterBase):
"""
farmOS API → WuttaFarm importer for Equipment Assets
"""
model_class = model.EquipmentAsset
equipment_types_by_farmos_uuid = None
def get_supported_fields(self):
fields = list(super().get_supported_fields())
fields.extend(
[
"equipment_types",
]
)
return fields
def setup(self):
super().setup()
model = self.app.model
self.equipment_types_by_farmos_uuid = {}
for equipment_type in self.target_session.query(model.EquipmentType):
if equipment_type.farmos_uuid:
self.equipment_types_by_farmos_uuid[equipment_type.farmos_uuid] = (
equipment_type
)
def get_equipment_type_by_farmos_uuid(self, uuid):
if self.equipment_types_by_farmos_uuid is not None:
return self.equipment_types_by_farmos_uuid.get(uuid)
model = self.app.model
return (
self.target_session.query(model.EquipmentType)
.filter_by(farmos_uuid=uuid)
.first()
)
def normalize_source_object(self, equipment):
""" """
data = super().normalize_source_object(equipment)
equipment_types = []
if relationships := equipment.get("relationships"):
if equipment_type := relationships.get("equipment_type"):
equipment_types = []
for equipment_type in equipment_type["data"]:
if wf_equipment_type := self.get_equipment_type_by_farmos_uuid(
UUID(equipment_type["id"])
):
equipment_types.append(wf_equipment_type.uuid)
else:
log.warning(
"equipment type not found: %s", equipment_type["id"]
)
data.update(
{
"manufacturer": equipment["attributes"]["manufacturer"],
"model": equipment["attributes"]["model"],
"serial_number": equipment["attributes"]["serial_number"],
"equipment_types": set(equipment_types),
}
)
return data
def normalize_target_object(self, equipment):
data = super().normalize_target_object(equipment)
if "equipment_types" in self.fields:
data["equipment_types"] = set(
[etype.uuid for etype in equipment.equipment_types]
)
return data
def update_target_object(self, equipment, source_data, target_data=None):
model = self.app.model
equipment = super().update_target_object(equipment, source_data, target_data)
if "equipment_types" in self.fields:
if (
not target_data
or target_data["equipment_types"] != source_data["equipment_types"]
):
for uuid in source_data["equipment_types"]:
if not target_data or uuid not in target_data["equipment_types"]:
self.target_session.flush()
equipment._equipment_types.append(
model.EquipmentAssetEquipmentType(equipment_type_uuid=uuid)
)
if target_data:
for uuid in target_data["equipment_types"]:
if uuid not in source_data["equipment_types"]:
equipment_type = (
self.target_session.query(
model.EquipmentAssetEquipmentType
)
.filter(
model.EquipmentAssetEquipmentType.equipment_asset
== equipment
)
.filter(
model.EquipmentAssetEquipmentType.equipment_type_uuid
== uuid
)
.one()
)
self.target_session.delete(equipment_type)
return equipment
class GroupAssetImporter(AssetImporterBase):
"""
farmOS API → WuttaFarm importer for Group Assets
"""
model_class = model.GroupAsset
def get_supported_fields(self):
fields = list(super().get_supported_fields())
fields.extend(
[
"produces_eggs",
]
)
return fields
def normalize_source_object(self, group):
""" """
data = super().normalize_source_object(group)
data.update(
{
"produces_eggs": group["attributes"]["produces_eggs"],
}
)
return data
class LandAssetImporter(AssetImporterBase):
"""
farmOS API → WuttaFarm importer for Land Assets
"""
model_class = model.LandAsset
land_types_by_id = None
def get_supported_fields(self):
fields = list(super().get_supported_fields())
fields.extend(
[
"land_type_uuid",
]
)
return fields
def setup(self):
""" """
super().setup()
model = self.app.model
self.land_types_by_id = {}
for land_type in self.target_session.query(model.LandType):
self.land_types_by_id[land_type.drupal_id] = land_type
def get_land_type_by_id(self, drupal_id):
if self.land_types_by_id is not None:
return self.land_types_by_id.get(drupal_id)
model = self.app.model
return (
self.target_session.query(model.LandType)
.filter_by(drupal_id=drupal_id)
.first()
)
def normalize_source_object(self, land):
""" """
land_type_id = land["attributes"]["land_type"]
land_type = self.get_land_type_by_id(land_type_id)
if not land_type:
log.warning(
"invalid land_type '%s' for farmOS Land Asset: %s", land_type_id, land
)
return None
data = super().normalize_source_object(land)
data.update(
{
"land_type_uuid": land_type.uuid,
}
)
return data
class LandTypeImporter(FromFarmOS, ToWutta):
"""
farmOS API → WuttaFarm importer for Land Types
"""
model_class = model.LandType
supported_fields = [
"farmos_uuid",
"drupal_id",
"name",
]
def get_source_objects(self):
""" """
return list(self.farmos_client.resource.iterate("land_type"))
def normalize_source_object(self, land_type):
""" """
return {
"farmos_uuid": UUID(land_type["id"]),
"drupal_id": land_type["attributes"]["drupal_internal__id"],
"name": land_type["attributes"]["label"],
}
class PlantTypeImporter(FromFarmOS, ToWutta):
"""
farmOS API → WuttaFarm importer for Plant Types
"""
model_class = model.PlantType
supported_fields = [
"farmos_uuid",
"drupal_id",
"name",
"description",
]
def get_source_objects(self):
""" """
return list(self.farmos_client.resource.iterate("taxonomy_term", "plant_type"))
def normalize_source_object(self, plant_type):
""" """
return {
"farmos_uuid": UUID(plant_type["id"]),
"drupal_id": plant_type["attributes"]["drupal_internal__tid"],
"name": plant_type["attributes"]["name"],
"description": plant_type["attributes"]["description"],
}
class SeasonImporter(FromFarmOS, ToWutta):
"""
farmOS API → WuttaFarm importer for Seasons
"""
model_class = model.Season
supported_fields = [
"farmos_uuid",
"drupal_id",
"name",
"description",
]
def get_source_objects(self):
""" """
return list(self.farmos_client.resource.iterate("taxonomy_term", "season"))
def normalize_source_object(self, season):
""" """
return {
"farmos_uuid": UUID(season["id"]),
"drupal_id": season["attributes"]["drupal_internal__tid"],
"name": season["attributes"]["name"],
"description": season["attributes"]["description"],
}
class PlantAssetImporter(AssetImporterBase):
"""
farmOS API → WuttaFarm importer for Plant Assets
"""
model_class = model.PlantAsset
plant_types_by_farmos_uuid = None
seasons_by_farmos_uuid = None
def get_supported_fields(self):
fields = list(super().get_supported_fields())
fields.extend(
[
"plant_types",
"seasons",
]
)
return fields
def setup(self):
super().setup()
model = self.app.model
self.plant_types_by_farmos_uuid = {}
for plant_type in self.target_session.query(model.PlantType):
if plant_type.farmos_uuid:
self.plant_types_by_farmos_uuid[plant_type.farmos_uuid] = plant_type
self.seasons_by_farmos_uuid = {}
for season in self.target_session.query(model.Season):
if season.farmos_uuid:
self.seasons_by_farmos_uuid[season.farmos_uuid] = season
def get_plant_type_by_farmos_uuid(self, uuid):
if self.plant_types_by_farmos_uuid is not None:
return self.plant_types_by_farmos_uuid.get(uuid)
model = self.app.model
return (
self.target_session.query(model.PlantType)
.filter_by(farmos_uuid=uuid)
.first()
)
def get_season_by_farmos_uuid(self, uuid):
if self.seasons_by_farmos_uuid is not None:
return self.seasons_by_farmos_uuid.get(uuid)
model = self.app.model
return (
self.target_session.query(model.Season).filter_by(farmos_uuid=uuid).first()
)
def normalize_source_object(self, plant):
""" """
data = super().normalize_source_object(plant)
plant_types = []
seasons = []
if relationships := plant.get("relationships"):
if plant_type := relationships.get("plant_type"):
plant_types = []
for plant_type in plant_type["data"]:
if wf_plant_type := self.get_plant_type_by_farmos_uuid(
UUID(plant_type["id"])
):
plant_types.append(wf_plant_type.uuid)
else:
log.warning("plant type not found: %s", plant_type["id"])
if season := relationships.get("season"):
seasons = []
for season in season["data"]:
if wf_season := self.get_season_by_farmos_uuid(UUID(season["id"])):
seasons.append(wf_season.uuid)
else:
log.warning("season not found: %s", season["id"])
data.update(
{
"plant_types": set(plant_types),
"seasons": set(seasons),
}
)
return data
def normalize_target_object(self, plant):
data = super().normalize_target_object(plant)
if "plant_types" in self.fields:
data["plant_types"] = set([pt.uuid for pt in plant.plant_types])
if "seasons" in self.fields:
data["seasons"] = set([s.uuid for s in plant.seasons])
return data
def update_target_object(self, plant, source_data, target_data=None):
model = self.app.model
plant = super().update_target_object(plant, source_data, target_data)
if "plant_types" in self.fields:
if (
not target_data
or target_data["plant_types"] != source_data["plant_types"]
):
for uuid in source_data["plant_types"]:
if not target_data or uuid not in target_data["plant_types"]:
self.target_session.flush()
plant._plant_types.append(
model.PlantAssetPlantType(plant_type_uuid=uuid)
)
if target_data:
for uuid in target_data["plant_types"]:
if uuid not in source_data["plant_types"]:
plant_type = (
self.target_session.query(model.PlantAssetPlantType)
.filter(model.PlantAssetPlantType.plant_asset == plant)
.filter(
model.PlantAssetPlantType.plant_type_uuid == uuid
)
.one()
)
self.target_session.delete(plant_type)
if "seasons" in self.fields:
if not target_data or target_data["seasons"] != source_data["seasons"]:
for uuid in source_data["seasons"]:
if not target_data or uuid not in target_data["seasons"]:
self.target_session.flush()
plant._seasons.append(model.PlantAssetSeason(season_uuid=uuid))
if target_data:
for uuid in target_data["seasons"]:
if uuid not in source_data["seasons"]:
season = (
self.target_session.query(model.PlantAssetSeason)
.filter(model.PlantAssetSeason.plant_asset == plant)
.filter(model.PlantAssetSeason.season_uuid == uuid)
.one()
)
self.target_session.delete(season)
return plant
class StructureAssetImporter(AssetImporterBase):
"""
farmOS API → WuttaFarm importer for Structure Assets
"""
model_class = model.StructureAsset
structure_types_by_id = None
def get_supported_fields(self):
fields = list(super().get_supported_fields())
fields.extend(
[
"structure_type_uuid",
]
)
return fields
def setup(self):
super().setup()
model = self.app.model
self.structure_types_by_id = {}
for structure_type in self.target_session.query(model.StructureType):
self.structure_types_by_id[structure_type.drupal_id] = structure_type
def get_structure_type_by_id(self, drupal_id):
if self.structure_types_by_id is not None:
return self.structure_types_by_id.get(drupal_id)
model = self.app.model
return (
self.target_session.query(model.StructureType)
.filter_by(drupal_id=drupal_id)
.first()
)
def normalize_source_object(self, structure):
""" """
structure_type_id = structure["attributes"]["structure_type"]
structure_type = self.get_structure_type_by_id(structure_type_id)
if not structure_type:
log.warning(
"invalid structure_type '%s' for farmOS Structure Asset: %s",
structure_type_id,
structure,
)
return None
data = super().normalize_source_object(structure)
data.update(
{
"structure_type_uuid": structure_type.uuid,
}
)
return data
class StructureTypeImporter(FromFarmOS, ToWutta):
"""
farmOS API → WuttaFarm importer for Structure Types
"""
model_class = model.StructureType
supported_fields = [
"farmos_uuid",
"drupal_id",
"name",
]
def get_source_objects(self):
""" """
return list(self.farmos_client.resource.iterate("structure_type"))
def normalize_source_object(self, structure_type):
""" """
return {
"farmos_uuid": UUID(structure_type["id"]),
"drupal_id": structure_type["attributes"]["drupal_internal__id"],
"name": structure_type["attributes"]["label"],
}
class WaterAssetImporter(AssetImporterBase):
"""
farmOS API → WuttaFarm importer for Water Assets
"""
model_class = model.WaterAsset
class CompostAssetImporter(AssetImporterBase):
"""
farmOS API → WuttaFarm importer for Compost Assets
"""
model_class = model.CompostAsset
class UserImporter(FromFarmOS, ToWutta):
"""
farmOS API → WuttaFarm importer for Users
"""
model_class = model.User
supported_fields = [
"farmos_uuid",
"drupal_id",
"username",
]
def get_simple_fields(self):
""" """
fields = list(super().get_simple_fields())
# nb. must explicitly declare extension fields
fields.extend(
[
"farmos_uuid",
"drupal_id",
]
)
return fields
def get_source_objects(self):
""" """
return list(self.farmos_client.resource.iterate("user"))
def normalize_source_object(self, user):
""" """
# nb. skip Anonymous user which does not have drupal id
drupal_id = user["attributes"].get("drupal_internal__uid")
if not drupal_id:
return None
return {
"farmos_uuid": UUID(user["id"]),
"drupal_id": drupal_id,
"username": user["attributes"]["name"],
}
def can_delete_object(self, user, data=None):
"""
Prevent delete for users which do not exist in farmOS.
"""
if not user.farmos_uuid:
return False
return True
##############################
# log importers
##############################
class MeasureImporter(FromFarmOS, ToWutta):
"""
farmOS API → WuttaFarm importer for Measures
"""
model_class = model.Measure
key = "drupal_id"
supported_fields = [
"drupal_id",
"ordinal",
"name",
]
def get_source_objects(self):
""" """
response = self.farmos_client.session.get(
self.app.get_farmos_url("/api/quantity/standard/resource/schema")
)
response.raise_for_status()
data = response.json()
self.ordinal = 0
return data["definitions"]["attributes"]["properties"]["measure"]["oneOf"]
def normalize_source_object(self, measure):
""" """
self.ordinal += 1
return {
"drupal_id": measure["const"],
"ordinal": self.ordinal,
"name": measure["title"],
}
class UnitImporter(FromFarmOS, ToWutta):
"""
farmOS API → WuttaFarm importer for Units
"""
model_class = model.Unit
supported_fields = [
"farmos_uuid",
"drupal_id",
"name",
"description",
]
def get_source_objects(self):
""" """
return list(self.farmos_client.resource.iterate("taxonomy_term", "unit"))
def normalize_source_object(self, unit):
""" """
return {
"farmos_uuid": UUID(unit["id"]),
"drupal_id": unit["attributes"]["drupal_internal__tid"],
"name": unit["attributes"]["name"],
"description": unit["attributes"]["description"],
}
class QuantityTypeImporter(FromFarmOS, ToWutta):
"""
farmOS API → WuttaFarm importer for Quantity Types
"""
model_class = model.QuantityType
supported_fields = [
"farmos_uuid",
"drupal_id",
"name",
"description",
]
def get_source_objects(self):
""" """
return list(self.farmos_client.resource.iterate("quantity_type"))
def normalize_source_object(self, quantity_type):
""" """
return {
"farmos_uuid": UUID(quantity_type["id"]),
"drupal_id": quantity_type["attributes"]["drupal_internal__id"],
"name": quantity_type["attributes"]["label"],
"description": quantity_type["attributes"]["description"],
}
class LogTypeImporter(FromFarmOS, ToWutta):
"""
farmOS API → WuttaFarm importer for Log Types
"""
model_class = model.LogType
supported_fields = [
"farmos_uuid",
"drupal_id",
"name",
"description",
]
def get_source_objects(self):
""" """
return list(self.farmos_client.resource.iterate("log_type"))
def normalize_source_object(self, log_type):
""" """
return {
"farmos_uuid": UUID(log_type["id"]),
"drupal_id": log_type["attributes"]["drupal_internal__id"],
"name": log_type["attributes"]["label"],
"description": log_type["attributes"]["description"],
}
class LogImporterBase(FromFarmOS, ToWutta):
"""
Base class for farmOS API → WuttaFarm log importers
"""
def get_farmos_log_type(self):
return self.model_class.__wutta_hint__["farmos_log_type"]
def get_simple_fields(self):
""" """
fields = list(super().get_simple_fields())
# nb. must explicitly declare proxy fields
fields.extend(
[
"farmos_uuid",
"drupal_id",
"log_type",
"message",
"timestamp",
"is_movement",
"is_group_assignment",
"notes",
"status",
"quick",
]
)
return fields
def get_supported_fields(self):
""" """
fields = list(super().get_supported_fields())
fields.extend(
[
"assets",
"groups",
"locations",
"quantities",
"owners",
]
)
return fields
def get_source_objects(self):
""" """
log_type = self.get_farmos_log_type()
# nb. must sort the data or else paging / iteration will not
# work correctly and we get back data set which contains
# duplicates but also is missing some records...
params = {"sort": "drupal_internal__id"}
return list(self.farmos_client.log.iterate(log_type, params=params))
def normalize_source_object(self, log):
""" """
data = self.normal.normalize_farmos_log(log)
data["farmos_uuid"] = UUID(data.pop("uuid"))
data["message"] = data.pop("name")
data["timestamp"] = self.app.make_utc(data["timestamp"])
data["quick"] = ", ".join(data["quick"]) if data["quick"] else None
# TODO
data["log_type"] = self.get_farmos_log_type()
if "assets" in self.fields:
data["assets"] = [
(a["asset_type"], UUID(a["uuid"])) for a in data["assets"]
]
if "groups" in self.fields:
data["groups"] = [
(asset["asset_type"], UUID(asset["uuid"])) for asset in data["groups"]
]
if "locations" in self.fields:
data["locations"] = [
(asset["asset_type"], UUID(asset["uuid"]))
for asset in data["locations"]
]
if "quantities" in self.fields:
data["quantities"] = [UUID(uuid) for uuid in data["quantity_uuids"]]
if "owners" in self.fields:
data["owners"] = [UUID(uuid) for uuid in data["owner_uuids"]]
return data
def normalize_target_object(self, log):
data = super().normalize_target_object(log)
if "assets" in self.fields:
data["assets"] = [
(asset.asset_type, asset.farmos_uuid) for asset in log.assets
]
if "groups" in self.fields:
data["groups"] = [
(asset.asset_type, asset.farmos_uuid) for asset in log.groups
]
if "locations" in self.fields:
data["locations"] = [
(asset.asset_type, asset.farmos_uuid) for asset in log.locations
]
if "quantities" in self.fields:
data["quantities"] = [qty.farmos_uuid for qty in log.quantities]
if "owners" in self.fields:
data["owners"] = [user.farmos_uuid for user in log.owners]
return data
def update_target_object(self, log, source_data, target_data=None):
model = self.app.model
log = super().update_target_object(log, source_data, target_data)
if "assets" in self.fields:
if not target_data or target_data["assets"] != source_data["assets"]:
for key in source_data["assets"]:
asset_type, farmos_uuid = key
if not target_data or key not in target_data["assets"]:
asset = (
self.target_session.query(model.Asset)
.filter(model.Asset.asset_type == asset_type)
.filter(model.Asset.farmos_uuid == farmos_uuid)
.one()
)
log.assets.append(asset)
if target_data:
for key in target_data["assets"]:
asset_type, farmos_uuid = key
if key not in source_data["assets"]:
asset = (
self.target_session.query(model.Asset)
.filter(model.Asset.asset_type == asset_type)
.filter(model.Asset.farmos_uuid == farmos_uuid)
.one()
)
log.assets.remove(asset)
if "groups" in self.fields:
if not target_data or target_data["groups"] != source_data["groups"]:
for key in source_data["groups"]:
asset_type, farmos_uuid = key
if not target_data or key not in target_data["groups"]:
asset = (
self.target_session.query(model.Asset)
.filter(model.Asset.asset_type == asset_type)
.filter(model.Asset.farmos_uuid == farmos_uuid)
.one()
)
log.groups.append(asset)
if target_data:
for key in target_data["groups"]:
asset_type, farmos_uuid = key
if key not in source_data["groups"]:
asset = (
self.target_session.query(model.Asset)
.filter(model.Asset.asset_type == asset_type)
.filter(model.Asset.farmos_uuid == farmos_uuid)
.one()
)
log.groups.remove(asset)
if "locations" in self.fields:
if not target_data or target_data["locations"] != source_data["locations"]:
for key in source_data["locations"]:
asset_type, farmos_uuid = key
if not target_data or key not in target_data["locations"]:
asset = (
self.target_session.query(model.Asset)
.filter(model.Asset.asset_type == asset_type)
.filter(model.Asset.farmos_uuid == farmos_uuid)
.one()
)
log.locations.append(asset)
if target_data:
for key in target_data["locations"]:
asset_type, farmos_uuid = key
if key not in source_data["locations"]:
asset = (
self.target_session.query(model.Asset)
.filter(model.Asset.asset_type == asset_type)
.filter(model.Asset.farmos_uuid == farmos_uuid)
.one()
)
log.locations.remove(asset)
if "quantities" in self.fields:
if (
not target_data
or target_data["quantities"] != source_data["quantities"]
):
for farmos_uuid in source_data["quantities"]:
if not target_data or farmos_uuid not in target_data["quantities"]:
qty = (
self.target_session.query(model.Quantity)
.filter(model.Quantity.farmos_uuid == farmos_uuid)
.one()
)
log.quantities.append(qty)
if target_data:
for farmos_uuid in target_data["quantities"]:
if farmos_uuid not in source_data["quantities"]:
qty = (
self.target_session.query(model.Quantity)
.filter(model.Quantity.farmos_uuid == farmos_uuid)
.one()
)
log.quantities.remove(qty)
if "owners" in self.fields:
if not target_data or target_data["owners"] != source_data["owners"]:
for farmos_uuid in source_data["owners"]:
if not target_data or farmos_uuid not in target_data["owners"]:
user = (
self.target_session.query(model.User)
.join(model.WuttaFarmUser)
.filter(model.WuttaFarmUser.farmos_uuid == farmos_uuid)
.one()
)
log.owners.append(user)
if target_data:
for farmos_uuid in target_data["owners"]:
if farmos_uuid not in source_data["owners"]:
user = (
self.target_session.query(model.User)
.join(model.WuttaFarmUser)
.filter(model.WuttaFarmUser.farmos_uuid == farmos_uuid)
.one()
)
log.owners.remove(user)
return log
class ActivityLogImporter(LogImporterBase):
"""
farmOS API → WuttaFarm importer for Activity Logs
"""
model_class = model.ActivityLog
class HarvestLogImporter(LogImporterBase):
"""
farmOS API → WuttaFarm importer for Harvest Logs
"""
model_class = model.HarvestLog
class MedicalLogImporter(LogImporterBase):
"""
farmOS API → WuttaFarm importer for Medical Logs
"""
model_class = model.MedicalLog
def get_simple_fields(self):
""" """
fields = list(super().get_simple_fields())
# nb. must explicitly declare proxy fields
fields.extend(
[
"vet",
]
)
return fields
class ObservationLogImporter(LogImporterBase):
"""
farmOS API → WuttaFarm importer for Observation Logs
"""
model_class = model.ObservationLog
class SeedingLogImporter(LogImporterBase):
"""
farmOS API → WuttaFarm importer for Seeding Logs
"""
model_class = model.SeedingLog
def get_simple_fields(self):
""" """
fields = list(super().get_simple_fields())
# nb. must explicitly declare proxy fields
fields.extend(
[
"source",
"purchase_date",
"lot_number",
]
)
return fields
def normalize_source_object(self, log):
""" """
data = super().normalize_source_object(log)
data.update(
{
"source": log["attributes"]["source"],
"purchase_date": self.normalize_datetime(
log["attributes"]["purchase_date"]
),
"lot_number": log["attributes"]["lot_number"],
}
)
return data
class QuantityImporterBase(FromFarmOS, ToWutta):
"""
Base class for farmOS API → WuttaFarm quantity importers
"""
def get_farmos_quantity_type(self):
return self.model_class.__wutta_hint__["farmos_quantity_type"]
def get_simple_fields(self):
""" """
fields = list(super().get_simple_fields())
# nb. must explicitly declare proxy fields
fields.extend(
[
"farmos_uuid",
"drupal_id",
"quantity_type_id",
"measure_id",
"value_numerator",
"value_denominator",
"units_uuid",
"label",
]
)
return fields
def setup(self):
super().setup()
model = self.app.model
self.quantity_types_by_farmos_uuid = {}
for quantity_type in self.target_session.query(model.QuantityType):
if quantity_type.farmos_uuid:
self.quantity_types_by_farmos_uuid[quantity_type.farmos_uuid] = (
quantity_type
)
self.units_by_farmos_uuid = {}
for unit in self.target_session.query(model.Unit):
if unit.farmos_uuid:
self.units_by_farmos_uuid[unit.farmos_uuid] = unit
def get_source_objects(self):
""" """
quantity_type = self.get_farmos_quantity_type()
return list(self.farmos_client.resource.iterate("quantity", quantity_type))
def get_quantity_type_by_farmos_uuid(self, uuid):
if hasattr(self, "quantity_types_by_farmos_uuid"):
return self.quantity_types_by_farmos_uuid.get(UUID(uuid))
model = self.app.model
return (
self.target_session.query(model.QuantityType)
.filter(model.QuantityType.farmos_uuid == uuid)
.one()
)
def get_unit_by_farmos_uuid(self, uuid):
if hasattr(self, "units_by_farmos_uuid"):
return self.units_by_farmos_uuid.get(UUID(uuid))
model = self.app.model
return (
self.target_session.query(model.Unit)
.filter(model.Unit.farmos_uuid == uuid)
.one()
)
def normalize_source_object(self, quantity):
""" """
quantity_type_id = None
units_uuid = None
if relationships := quantity.get("relationships"):
if quantity_type := relationships.get("quantity_type"):
if quantity_type["data"]:
if wf_quantity_type := self.get_quantity_type_by_farmos_uuid(
quantity_type["data"]["id"]
):
quantity_type_id = wf_quantity_type.drupal_id
if units := relationships.get("units"):
if units["data"]:
if wf_unit := self.get_unit_by_farmos_uuid(units["data"]["id"]):
units_uuid = wf_unit.uuid
if not quantity_type_id:
log.warning(
"missing/invalid quantity_type for farmOS Quantity: %s", quantity
)
return None
if not units_uuid:
log.warning("missing/invalid units for farmOS Quantity: %s", quantity)
return None
value = quantity["attributes"]["value"]
return {
"farmos_uuid": UUID(quantity["id"]),
"drupal_id": quantity["attributes"]["drupal_internal__id"],
"quantity_type_id": quantity_type_id,
"measure_id": quantity["attributes"]["measure"],
"value_numerator": value["numerator"],
"value_denominator": value["denominator"],
"units_uuid": units_uuid,
"label": quantity["attributes"]["label"],
}
class StandardQuantityImporter(QuantityImporterBase):
"""
farmOS API → WuttaFarm importer for Standard Quantities
"""
model_class = model.StandardQuantity
supported_fields = [
"farmos_uuid",
"drupal_id",
"quantity_type_id",
"measure_id",
"value_numerator",
"value_denominator",
"units_uuid",
"label",
]
class MaterialQuantityImporter(QuantityImporterBase):
"""
farmOS API → WuttaFarm importer for Material Quantities
"""
model_class = model.MaterialQuantity
def get_supported_fields(self):
fields = list(super().get_supported_fields())
fields.extend(
[
"material_types",
]
)
return fields
def normalize_source_object(self, quantity):
""" """
data = super().normalize_source_object(quantity)
if "material_types" in self.fields:
data["material_types"] = [
UUID(mtype["id"])
for mtype in quantity["relationships"]["material_type"]["data"]
]
return data
def normalize_target_object(self, quantity):
data = super().normalize_target_object(quantity)
if "material_types" in self.fields:
data["material_types"] = [
mtype.farmos_uuid for mtype in quantity.material_types
]
return data
def update_target_object(self, quantity, source_data, target_data=None):
model = self.app.model
quantity = super().update_target_object(quantity, source_data, target_data)
if "material_types" in self.fields:
if (
not target_data
or target_data["material_types"] != source_data["material_types"]
):
for farmos_uuid in source_data["material_types"]:
if (
not target_data
or farmos_uuid not in target_data["material_types"]
):
mtype = (
self.target_session.query(model.MaterialType)
.filter(model.MaterialType.farmos_uuid == farmos_uuid)
.one()
)
quantity.material_types.append(mtype)
if target_data:
for farmos_uuid in target_data["material_types"]:
if farmos_uuid not in source_data["material_types"]:
mtype = (
self.target_session.query(model.MaterialType)
.filter(model.MaterialType.farmos_uuid == farmos_uuid)
.one()
)
quantity.material_types.remove(mtype)
return quantity