diff --git a/src/wuttafarm/app.py b/src/wuttafarm/app.py
index decd44f..c5a581b 100644
--- a/src/wuttafarm/app.py
+++ b/src/wuttafarm/app.py
@@ -270,6 +270,7 @@ class WuttaFarmAppHandler(base.AppHandler):
then nothing will happen / import is silently skipped when
there is no such importer.
"""
+ model = self.app.model
handler = self.app.get_import_handler("import.to_wuttafarm.from_farmos")
if model_name not in handler.importers:
@@ -280,6 +281,10 @@ class WuttaFarmAppHandler(base.AppHandler):
# nb. begin txn to establish the API client
handler.begin_source_transaction(client)
with self.short_session(commit=True) as session:
+
+ if user := session.query(model.User).filter_by(username="farmos").first():
+ session.info["continuum_user_id"] = user.uuid
+
handler.target_session = session
importer = handler.get_importer(model_name, caches_target=False)
normal = importer.normalize_source_object(obj)
diff --git a/src/wuttafarm/cli/__init__.py b/src/wuttafarm/cli/__init__.py
index cd06344..71d4994 100644
--- a/src/wuttafarm/cli/__init__.py
+++ b/src/wuttafarm/cli/__init__.py
@@ -29,3 +29,4 @@ from .base import wuttafarm_typer
from . import export_farmos
from . import import_farmos
from . import install
+from . import process_webhooks
diff --git a/src/wuttafarm/cli/process_webhooks.py b/src/wuttafarm/cli/process_webhooks.py
new file mode 100644
index 0000000..9731247
--- /dev/null
+++ b/src/wuttafarm/cli/process_webhooks.py
@@ -0,0 +1,181 @@
+# -*- coding: utf-8; -*-
+################################################################################
+#
+# WuttaFarm --Web app to integrate with and extend farmOS
+# Copyright © 2026 Lance Edgar
+#
+# This file is part of WuttaFarm.
+#
+# WuttaFarm is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free Software
+# Foundation, either version 3 of the License, or (at your option) any later
+# version.
+#
+# WuttaFarm is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# WuttaFarm. If not, see .
+#
+################################################################################
+"""
+WuttaFarm CLI
+"""
+
+import logging
+import time
+
+import typer
+from oauthlib.oauth2 import BackendApplicationClient
+from requests_oauthlib import OAuth2Session
+
+from wuttafarm.cli import wuttafarm_typer
+
+
+log = logging.getLogger(__name__)
+
+
+class ChangeProcessor:
+
+ def __init__(self, config):
+ self.config = config
+ self.app = config.get_app()
+
+ def process_change(self, change):
+ if change.deleted:
+ self.delete_record(change)
+ else:
+ self.import_record(change)
+
+ def import_record(self, change):
+ token = self.get_farmos_oauth2_token()
+ client = self.app.get_farmos_client(token=token)
+
+ full_type = f"{change.entity_type}--{change.bundle}"
+ record = client.resource.get_id(
+ change.entity_type, change.bundle, change.farmos_uuid
+ )
+
+ importer_map = self.get_importer_map()
+ model_name = importer_map[full_type]
+
+ self.app.auto_sync_from_farmos(record["data"], model_name, client=client)
+
+ def delete_record(self, change):
+ model = self.app.model
+ handler = self.app.get_import_handler("import.to_wuttafarm.from_farmos")
+
+ importer_map = self.get_importer_map()
+ full_type = f"{change.entity_type}--{change.bundle}"
+ model_name = importer_map[full_type]
+
+ token = self.get_farmos_oauth2_token()
+ client = self.app.get_farmos_client(token=token)
+
+ # nb. begin txn to establish the API client
+ handler.begin_source_transaction(client)
+ with self.app.short_session(commit=True) as session:
+ handler.target_session = session
+ importer = handler.get_importer(model_name, caches_target=False)
+
+ # try to attribute change to 'farmos' user
+ if user := session.query(model.User).filter_by(username="farmos").first():
+ session.info["continuum_user_id"] = user.uuid
+
+ # only support importers with farmos_uuid as key
+ # (pretty sure that covers us..can revise if needed)
+ if importer.get_keys() != ["farmos_uuid"]:
+ log.warning(
+ "unsupported keys for %s importer: %s",
+ model_name,
+ importer.get_keys(),
+ )
+ return
+
+ # delete corresponding record from our app
+ obj = importer.get_target_object((change.uuid,))
+ if obj:
+ importer.delete_target_object(obj)
+
+ # TODO: this should live elsewhere
+ def get_farmos_oauth2_token(self):
+
+ client_id = self.config.get(
+ "farmos.oauth2.importing.client_id", default="wuttafarm"
+ )
+ client_secret = self.config.require("farmos.oauth2.importing.client_secret")
+ scope = self.config.get("farmos.oauth2.importing.scope", default="farm_manager")
+
+ client = BackendApplicationClient(client_id=client_id)
+ oauth = OAuth2Session(client=client)
+
+ return oauth.fetch_token(
+ token_url=self.app.get_farmos_url("/oauth/token"),
+ include_client_id=True,
+ client_secret=client_secret,
+ scope=scope,
+ )
+
+ # TODO: this should live elsewhere
+ def get_importer_map(self):
+ return {
+ "asset--animal": "AnimalAsset",
+ "asset--equipment": "EquipmentAsset",
+ "asset--group": "GroupAsset",
+ "asset--land": "LandAsset",
+ "asset--plant": "PlantAsset",
+ "asset--structure": "StructureAsset",
+ "asset--water": "WaterAsset",
+ "log--activity": "ActivityLog",
+ "log--harvest": "HarvestLog",
+ "log--medical": "MedicalLog",
+ "log--observation": "ObservationLog",
+ "log--seeding": "SeedingLog",
+ "quantity--material": "MaterialQuantity",
+ "quantity--standard": "StandardQuantity",
+ "taxonomy_term--animal_type": "AnimalType",
+ "taxonomy_term--equipment_type": "EquipmentType",
+ "taxonomy_term--plant_type": "PlantType",
+ "taxonomy_term--season": "Season",
+ "taxonomy_term--material_type": "MaterialType",
+ "taxonomy_term--unit": "Unit",
+ }
+
+
+@wuttafarm_typer.command()
+def process_webhooks(
+ ctx: typer.Context,
+):
+ """
+ Process incoming webhook requests from farmOS.
+ """
+ config = ctx.parent.wutta_config
+ app = config.get_app()
+ model = app.model
+ processor = ChangeProcessor(config)
+
+ while True:
+ with app.short_session(commit=True) as session:
+
+ query = session.query(model.WebhookChange).order_by(
+ model.WebhookChange.received
+ )
+
+ # nb. fetch (at most) 2 changes instead of just 1;
+ # this will control time delay behavior below
+ if changes := query[:2]:
+
+ # process first change
+ change = changes[0]
+ log.info("processing webhook change: %s", change)
+ processor.process_change(change)
+ session.delete(change)
+
+ # minimal time delay if 2nd change exists
+ if len(changes) == 2:
+ time.sleep(0.1)
+ continue
+
+ # nothing in queue, so wait 1 sec before checking again
+ time.sleep(1)
diff --git a/src/wuttafarm/db/alembic/versions/dd4d4142b96d_add_webhookchange.py b/src/wuttafarm/db/alembic/versions/dd4d4142b96d_add_webhookchange.py
new file mode 100644
index 0000000..5b566b2
--- /dev/null
+++ b/src/wuttafarm/db/alembic/versions/dd4d4142b96d_add_webhookchange.py
@@ -0,0 +1,41 @@
+"""add WebhookChange
+
+Revision ID: dd4d4142b96d
+Revises: dca5b48a5562
+Create Date: 2026-03-10 22:31:54.324952
+
+"""
+
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+import wuttjamaican.db.util
+
+
+# revision identifiers, used by Alembic.
+revision: str = "dd4d4142b96d"
+down_revision: Union[str, None] = "dca5b48a5562"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+
+ # webhook_change
+ op.create_table(
+ "webhook_change",
+ sa.Column("uuid", wuttjamaican.db.util.UUID(), nullable=False),
+ sa.Column("entity_type", sa.String(length=100), nullable=False),
+ sa.Column("bundle", sa.String(length=100), nullable=False),
+ sa.Column("farmos_uuid", wuttjamaican.db.util.UUID(), nullable=False),
+ sa.Column("deleted", sa.Boolean(), nullable=False),
+ sa.Column("received", sa.DateTime(), nullable=False),
+ sa.PrimaryKeyConstraint("uuid", name=op.f("pk_webhook_change")),
+ )
+
+
+def downgrade() -> None:
+
+ # webhook_change
+ op.drop_table("webhook_change")
diff --git a/src/wuttafarm/db/model/__init__.py b/src/wuttafarm/db/model/__init__.py
index d90272b..716fd1c 100644
--- a/src/wuttafarm/db/model/__init__.py
+++ b/src/wuttafarm/db/model/__init__.py
@@ -59,3 +59,6 @@ from .log_harvest import HarvestLog
from .log_medical import MedicalLog
from .log_observation import ObservationLog
from .log_seeding import SeedingLog
+
+# misc.
+from .webhook import WebhookChange
diff --git a/src/wuttafarm/db/model/webhook.py b/src/wuttafarm/db/model/webhook.py
new file mode 100644
index 0000000..b96a572
--- /dev/null
+++ b/src/wuttafarm/db/model/webhook.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8; -*-
+################################################################################
+#
+# WuttaFarm --Web app to integrate with and extend farmOS
+# Copyright © 2026 Lance Edgar
+#
+# This file is part of WuttaFarm.
+#
+# WuttaFarm is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free Software
+# Foundation, either version 3 of the License, or (at your option) any later
+# version.
+#
+# WuttaFarm is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# WuttaFarm. If not, see .
+#
+################################################################################
+"""
+Model definition for webhook changes
+"""
+
+import sqlalchemy as sa
+
+from wuttjamaican.db import model
+from wuttjamaican.util import make_utc
+
+
+class WebhookChange(model.Base):
+ """
+ Represents a "change" (create/update/delete) notification which
+ originated in farmOS and delivered via webhook.
+
+ This table serves as a "FIFO queue" for processing the changes.
+ """
+
+ __tablename__ = "webhook_change"
+
+ uuid = model.uuid_column()
+
+ entity_type = sa.Column(sa.String(length=100), nullable=False)
+ bundle = sa.Column(sa.String(length=100), nullable=False)
+ farmos_uuid = sa.Column(model.UUID(), nullable=False)
+
+ deleted = sa.Column(sa.Boolean(), nullable=False)
+
+ received = sa.Column(
+ sa.DateTime(),
+ nullable=False,
+ default=make_utc,
+ doc="""
+ Date and time when the change was obtained from the watcher thread.
+ """,
+ )
+
+ def __str__(self):
+ event_type = "delete" if self.deleted else "create/update"
+ return f"{event_type} {self.entity_type}--{self.bundle}: {self.farmos_uuid}"
diff --git a/src/wuttafarm/importing/farmos.py b/src/wuttafarm/importing/farmos.py
index c739bad..4f5a47a 100644
--- a/src/wuttafarm/importing/farmos.py
+++ b/src/wuttafarm/importing/farmos.py
@@ -339,6 +339,8 @@ class AnimalAssetImporter(AssetImporterBase):
model_class = model.AnimalAsset
+ animal_types_by_farmos_uuid = None
+
def get_supported_fields(self):
fields = list(super().get_supported_fields())
fields.extend(
@@ -361,6 +363,17 @@ class AnimalAssetImporter(AssetImporterBase):
if animal_type.farmos_uuid:
self.animal_types_by_farmos_uuid[animal_type.farmos_uuid] = animal_type
+ def get_animal_type_by_farmos_uuid(self, uuid):
+ if self.animal_types_by_farmos_uuid is not None:
+ return self.animal_types_by_farmos_uuid.get(uuid)
+
+ model = self.app.model
+ return (
+ self.target_session.query(model.AnimalType)
+ .filter(model.AnimalType.farmos_uuid == uuid)
+ .first()
+ )
+
def normalize_source_object(self, animal):
""" """
animal_type_uuid = None
@@ -368,7 +381,7 @@ class AnimalAssetImporter(AssetImporterBase):
if animal_type := relationships.get("animal_type"):
if animal_type["data"]:
- if wf_animal_type := self.animal_types_by_farmos_uuid.get(
+ if wf_animal_type := self.get_animal_type_by_farmos_uuid(
UUID(animal_type["data"]["id"])
):
animal_type_uuid = wf_animal_type.uuid
@@ -500,6 +513,8 @@ class EquipmentAssetImporter(AssetImporterBase):
model_class = model.EquipmentAsset
+ equipment_types_by_farmos_uuid = None
+
def get_supported_fields(self):
fields = list(super().get_supported_fields())
fields.extend(
@@ -520,6 +535,17 @@ class EquipmentAssetImporter(AssetImporterBase):
equipment_type
)
+ def get_equipment_type_by_farmos_uuid(self, uuid):
+ if self.equipment_types_by_farmos_uuid is not None:
+ return self.equipment_types_by_farmos_uuid.get(uuid)
+
+ model = self.app.model
+ return (
+ self.target_session.query(model.EquipmentType)
+ .filter_by(farmos_uuid=uuid)
+ .first()
+ )
+
def normalize_source_object(self, equipment):
""" """
data = super().normalize_source_object(equipment)
@@ -530,7 +556,7 @@ class EquipmentAssetImporter(AssetImporterBase):
if equipment_type := relationships.get("equipment_type"):
equipment_types = []
for equipment_type in equipment_type["data"]:
- if wf_equipment_type := self.equipment_types_by_farmos_uuid.get(
+ if wf_equipment_type := self.get_equipment_type_by_farmos_uuid(
UUID(equipment_type["id"])
):
equipment_types.append(wf_equipment_type.uuid)
@@ -632,6 +658,8 @@ class LandAssetImporter(AssetImporterBase):
model_class = model.LandAsset
+ land_types_by_id = None
+
def get_supported_fields(self):
fields = list(super().get_supported_fields())
fields.extend(
@@ -650,10 +678,21 @@ class LandAssetImporter(AssetImporterBase):
for land_type in self.target_session.query(model.LandType):
self.land_types_by_id[land_type.drupal_id] = land_type
+ def get_land_type_by_id(self, drupal_id):
+ if self.land_types_by_id is not None:
+ return self.land_types_by_id.get(drupal_id)
+
+ model = self.app.model
+ return (
+ self.target_session.query(model.LandType)
+ .filter_by(drupal_id=drupal_id)
+ .first()
+ )
+
def normalize_source_object(self, land):
""" """
land_type_id = land["attributes"]["land_type"]
- land_type = self.land_types_by_id.get(land_type_id)
+ land_type = self.get_land_type_by_id(land_type_id)
if not land_type:
log.warning(
"invalid land_type '%s' for farmOS Land Asset: %s", land_type_id, land
@@ -758,6 +797,9 @@ class PlantAssetImporter(AssetImporterBase):
model_class = model.PlantAsset
+ plant_types_by_farmos_uuid = None
+ seasons_by_farmos_uuid = None
+
def get_supported_fields(self):
fields = list(super().get_supported_fields())
fields.extend(
@@ -782,6 +824,26 @@ class PlantAssetImporter(AssetImporterBase):
if season.farmos_uuid:
self.seasons_by_farmos_uuid[season.farmos_uuid] = season
+ def get_plant_type_by_farmos_uuid(self, uuid):
+ if self.plant_types_by_farmos_uuid is not None:
+ return self.plant_types_by_farmos_uuid.get(uuid)
+
+ model = self.app.model
+ return (
+ self.target_session.query(model.PlantType)
+ .filter_by(farmos_uuid=uuid)
+ .first()
+ )
+
+ def get_season_by_farmos_uuid(self, uuid):
+ if self.seasons_by_farmos_uuid is not None:
+ return self.seasons_by_farmos_uuid.get(uuid)
+
+ model = self.app.model
+ return (
+ self.target_session.query(model.Season).filter_by(farmos_uuid=uuid).first()
+ )
+
def normalize_source_object(self, plant):
""" """
data = super().normalize_source_object(plant)
@@ -793,7 +855,7 @@ class PlantAssetImporter(AssetImporterBase):
if plant_type := relationships.get("plant_type"):
plant_types = []
for plant_type in plant_type["data"]:
- if wf_plant_type := self.plant_types_by_farmos_uuid.get(
+ if wf_plant_type := self.get_plant_type_by_farmos_uuid(
UUID(plant_type["id"])
):
plant_types.append(wf_plant_type.uuid)
@@ -803,7 +865,7 @@ class PlantAssetImporter(AssetImporterBase):
if season := relationships.get("season"):
seasons = []
for season in season["data"]:
- if wf_season := self.seasons_by_farmos_uuid.get(UUID(season["id"])):
+ if wf_season := self.get_season_by_farmos_uuid(UUID(season["id"])):
seasons.append(wf_season.uuid)
else:
log.warning("season not found: %s", season["id"])
@@ -886,6 +948,8 @@ class StructureAssetImporter(AssetImporterBase):
model_class = model.StructureAsset
+ structure_types_by_id = None
+
def get_supported_fields(self):
fields = list(super().get_supported_fields())
fields.extend(
@@ -903,10 +967,21 @@ class StructureAssetImporter(AssetImporterBase):
for structure_type in self.target_session.query(model.StructureType):
self.structure_types_by_id[structure_type.drupal_id] = structure_type
+ def get_structure_type_by_id(self, drupal_id):
+ if self.structure_types_by_id is not None:
+ return self.structure_types_by_id.get(drupal_id)
+
+ model = self.app.model
+ return (
+ self.target_session.query(model.StructureType)
+ .filter_by(drupal_id=drupal_id)
+ .first()
+ )
+
def normalize_source_object(self, structure):
""" """
structure_type_id = structure["attributes"]["structure_type"]
- structure_type = self.structure_types_by_id.get(structure_type_id)
+ structure_type = self.get_structure_type_by_id(structure_type_id)
if not structure_type:
log.warning(
"invalid structure_type '%s' for farmOS Structure Asset: %s",
diff --git a/src/wuttafarm/web/templates/appinfo/configure.mako b/src/wuttafarm/web/templates/appinfo/configure.mako
index 912eef0..26d6a54 100644
--- a/src/wuttafarm/web/templates/appinfo/configure.mako
+++ b/src/wuttafarm/web/templates/appinfo/configure.mako
@@ -57,6 +57,11 @@
+
+
+
+
.
+#
+################################################################################
+"""
+Views for use as webhooks
+"""
+
+import logging
+from uuid import UUID
+
+from wuttaweb.views import View
+from wuttaweb.db import Session
+
+
+log = logging.getLogger(__name__)
+
+
+class WebhookView(View):
+ """
+ Webhook views
+ """
+
+ def farmos_webhook(self):
+ model = self.app.model
+ session = Session()
+
+ try:
+ data = self.request.json
+ log.debug("got webhook payload: %s", data)
+
+ _, entity_type, event_type = data["event"].split(":")
+
+ uuid = data["entity"]["uuid"][0]["value"]
+ if entity_type == "taxonomy_term":
+ bundle = data["entity"]["vid"][0]["target_id"]
+ else:
+ bundle = data["entity"]["type"][0]["target_id"]
+
+ change = model.WebhookChange(
+ entity_type=entity_type,
+ bundle=bundle,
+ farmos_uuid=UUID(uuid),
+ deleted=event_type == "delete",
+ )
+ session.add(change)
+
+ except:
+ log.exception("failed to process webhook request")
+
+ return {}
+
+ @classmethod
+ def defaults(cls, config):
+ cls._webhook_defaults(config)
+
+ @classmethod
+ def _webhook_defaults(cls, config):
+
+ # farmos webhook
+ config.add_route("webhooks.farmos", "/farmos/webhook", request_method="POST")
+ config.add_view(
+ cls,
+ attr="farmos_webhook",
+ route_name="webhooks.farmos",
+ require_csrf=False,
+ renderer="json",
+ )
+
+
+def defaults(config, **kwargs):
+ base = globals()
+
+ WebhookView = kwargs.get("WebhookView", base["WebhookView"])
+ WebhookView.defaults(config)
+
+
+def includeme(config):
+ defaults(config)