From 0f3ef5227b070d43797d89723032868723bf7e27 Mon Sep 17 00:00:00 2001 From: Lance Edgar Date: Wed, 11 Mar 2026 08:57:50 -0500 Subject: [PATCH] feat: add support for webhooks module in farmOS this lets farmOS send a POST request to a webhook URL in our app, which then records a "stub" record in a change queue table. from there a daemon should process the queue and import/delete records as needed in our app DB. this all requires setup on the farmOS side as well..those details will be documented elsewhere (eventually!) --- src/wuttafarm/app.py | 5 + src/wuttafarm/cli/__init__.py | 1 + src/wuttafarm/cli/process_webhooks.py | 181 ++++++++++++++++++ .../dd4d4142b96d_add_webhookchange.py | 41 ++++ src/wuttafarm/db/model/__init__.py | 3 + src/wuttafarm/db/model/webhook.py | 61 ++++++ src/wuttafarm/importing/farmos.py | 87 ++++++++- .../web/templates/appinfo/configure.mako | 5 + src/wuttafarm/web/views/__init__.py | 4 + src/wuttafarm/web/views/webhooks.py | 96 ++++++++++ 10 files changed, 478 insertions(+), 6 deletions(-) create mode 100644 src/wuttafarm/cli/process_webhooks.py create mode 100644 src/wuttafarm/db/alembic/versions/dd4d4142b96d_add_webhookchange.py create mode 100644 src/wuttafarm/db/model/webhook.py create mode 100644 src/wuttafarm/web/views/webhooks.py diff --git a/src/wuttafarm/app.py b/src/wuttafarm/app.py index decd44f..c5a581b 100644 --- a/src/wuttafarm/app.py +++ b/src/wuttafarm/app.py @@ -270,6 +270,7 @@ class WuttaFarmAppHandler(base.AppHandler): then nothing will happen / import is silently skipped when there is no such importer. """ + model = self.app.model handler = self.app.get_import_handler("import.to_wuttafarm.from_farmos") if model_name not in handler.importers: @@ -280,6 +281,10 @@ class WuttaFarmAppHandler(base.AppHandler): # nb. begin txn to establish the API client handler.begin_source_transaction(client) with self.short_session(commit=True) as session: + + if user := session.query(model.User).filter_by(username="farmos").first(): + session.info["continuum_user_id"] = user.uuid + handler.target_session = session importer = handler.get_importer(model_name, caches_target=False) normal = importer.normalize_source_object(obj) diff --git a/src/wuttafarm/cli/__init__.py b/src/wuttafarm/cli/__init__.py index cd06344..71d4994 100644 --- a/src/wuttafarm/cli/__init__.py +++ b/src/wuttafarm/cli/__init__.py @@ -29,3 +29,4 @@ from .base import wuttafarm_typer from . import export_farmos from . import import_farmos from . import install +from . import process_webhooks diff --git a/src/wuttafarm/cli/process_webhooks.py b/src/wuttafarm/cli/process_webhooks.py new file mode 100644 index 0000000..9731247 --- /dev/null +++ b/src/wuttafarm/cli/process_webhooks.py @@ -0,0 +1,181 @@ +# -*- coding: utf-8; -*- +################################################################################ +# +# WuttaFarm --Web app to integrate with and extend farmOS +# Copyright © 2026 Lance Edgar +# +# This file is part of WuttaFarm. +# +# WuttaFarm is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# WuttaFarm is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along with +# WuttaFarm. If not, see . +# +################################################################################ +""" +WuttaFarm CLI +""" + +import logging +import time + +import typer +from oauthlib.oauth2 import BackendApplicationClient +from requests_oauthlib import OAuth2Session + +from wuttafarm.cli import wuttafarm_typer + + +log = logging.getLogger(__name__) + + +class ChangeProcessor: + + def __init__(self, config): + self.config = config + self.app = config.get_app() + + def process_change(self, change): + if change.deleted: + self.delete_record(change) + else: + self.import_record(change) + + def import_record(self, change): + token = self.get_farmos_oauth2_token() + client = self.app.get_farmos_client(token=token) + + full_type = f"{change.entity_type}--{change.bundle}" + record = client.resource.get_id( + change.entity_type, change.bundle, change.farmos_uuid + ) + + importer_map = self.get_importer_map() + model_name = importer_map[full_type] + + self.app.auto_sync_from_farmos(record["data"], model_name, client=client) + + def delete_record(self, change): + model = self.app.model + handler = self.app.get_import_handler("import.to_wuttafarm.from_farmos") + + importer_map = self.get_importer_map() + full_type = f"{change.entity_type}--{change.bundle}" + model_name = importer_map[full_type] + + token = self.get_farmos_oauth2_token() + client = self.app.get_farmos_client(token=token) + + # nb. begin txn to establish the API client + handler.begin_source_transaction(client) + with self.app.short_session(commit=True) as session: + handler.target_session = session + importer = handler.get_importer(model_name, caches_target=False) + + # try to attribute change to 'farmos' user + if user := session.query(model.User).filter_by(username="farmos").first(): + session.info["continuum_user_id"] = user.uuid + + # only support importers with farmos_uuid as key + # (pretty sure that covers us..can revise if needed) + if importer.get_keys() != ["farmos_uuid"]: + log.warning( + "unsupported keys for %s importer: %s", + model_name, + importer.get_keys(), + ) + return + + # delete corresponding record from our app + obj = importer.get_target_object((change.uuid,)) + if obj: + importer.delete_target_object(obj) + + # TODO: this should live elsewhere + def get_farmos_oauth2_token(self): + + client_id = self.config.get( + "farmos.oauth2.importing.client_id", default="wuttafarm" + ) + client_secret = self.config.require("farmos.oauth2.importing.client_secret") + scope = self.config.get("farmos.oauth2.importing.scope", default="farm_manager") + + client = BackendApplicationClient(client_id=client_id) + oauth = OAuth2Session(client=client) + + return oauth.fetch_token( + token_url=self.app.get_farmos_url("/oauth/token"), + include_client_id=True, + client_secret=client_secret, + scope=scope, + ) + + # TODO: this should live elsewhere + def get_importer_map(self): + return { + "asset--animal": "AnimalAsset", + "asset--equipment": "EquipmentAsset", + "asset--group": "GroupAsset", + "asset--land": "LandAsset", + "asset--plant": "PlantAsset", + "asset--structure": "StructureAsset", + "asset--water": "WaterAsset", + "log--activity": "ActivityLog", + "log--harvest": "HarvestLog", + "log--medical": "MedicalLog", + "log--observation": "ObservationLog", + "log--seeding": "SeedingLog", + "quantity--material": "MaterialQuantity", + "quantity--standard": "StandardQuantity", + "taxonomy_term--animal_type": "AnimalType", + "taxonomy_term--equipment_type": "EquipmentType", + "taxonomy_term--plant_type": "PlantType", + "taxonomy_term--season": "Season", + "taxonomy_term--material_type": "MaterialType", + "taxonomy_term--unit": "Unit", + } + + +@wuttafarm_typer.command() +def process_webhooks( + ctx: typer.Context, +): + """ + Process incoming webhook requests from farmOS. + """ + config = ctx.parent.wutta_config + app = config.get_app() + model = app.model + processor = ChangeProcessor(config) + + while True: + with app.short_session(commit=True) as session: + + query = session.query(model.WebhookChange).order_by( + model.WebhookChange.received + ) + + # nb. fetch (at most) 2 changes instead of just 1; + # this will control time delay behavior below + if changes := query[:2]: + + # process first change + change = changes[0] + log.info("processing webhook change: %s", change) + processor.process_change(change) + session.delete(change) + + # minimal time delay if 2nd change exists + if len(changes) == 2: + time.sleep(0.1) + continue + + # nothing in queue, so wait 1 sec before checking again + time.sleep(1) diff --git a/src/wuttafarm/db/alembic/versions/dd4d4142b96d_add_webhookchange.py b/src/wuttafarm/db/alembic/versions/dd4d4142b96d_add_webhookchange.py new file mode 100644 index 0000000..5b566b2 --- /dev/null +++ b/src/wuttafarm/db/alembic/versions/dd4d4142b96d_add_webhookchange.py @@ -0,0 +1,41 @@ +"""add WebhookChange + +Revision ID: dd4d4142b96d +Revises: dca5b48a5562 +Create Date: 2026-03-10 22:31:54.324952 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +import wuttjamaican.db.util + + +# revision identifiers, used by Alembic. +revision: str = "dd4d4142b96d" +down_revision: Union[str, None] = "dca5b48a5562" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + + # webhook_change + op.create_table( + "webhook_change", + sa.Column("uuid", wuttjamaican.db.util.UUID(), nullable=False), + sa.Column("entity_type", sa.String(length=100), nullable=False), + sa.Column("bundle", sa.String(length=100), nullable=False), + sa.Column("farmos_uuid", wuttjamaican.db.util.UUID(), nullable=False), + sa.Column("deleted", sa.Boolean(), nullable=False), + sa.Column("received", sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint("uuid", name=op.f("pk_webhook_change")), + ) + + +def downgrade() -> None: + + # webhook_change + op.drop_table("webhook_change") diff --git a/src/wuttafarm/db/model/__init__.py b/src/wuttafarm/db/model/__init__.py index d90272b..716fd1c 100644 --- a/src/wuttafarm/db/model/__init__.py +++ b/src/wuttafarm/db/model/__init__.py @@ -59,3 +59,6 @@ from .log_harvest import HarvestLog from .log_medical import MedicalLog from .log_observation import ObservationLog from .log_seeding import SeedingLog + +# misc. +from .webhook import WebhookChange diff --git a/src/wuttafarm/db/model/webhook.py b/src/wuttafarm/db/model/webhook.py new file mode 100644 index 0000000..b96a572 --- /dev/null +++ b/src/wuttafarm/db/model/webhook.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8; -*- +################################################################################ +# +# WuttaFarm --Web app to integrate with and extend farmOS +# Copyright © 2026 Lance Edgar +# +# This file is part of WuttaFarm. +# +# WuttaFarm is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# WuttaFarm is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along with +# WuttaFarm. If not, see . +# +################################################################################ +""" +Model definition for webhook changes +""" + +import sqlalchemy as sa + +from wuttjamaican.db import model +from wuttjamaican.util import make_utc + + +class WebhookChange(model.Base): + """ + Represents a "change" (create/update/delete) notification which + originated in farmOS and delivered via webhook. + + This table serves as a "FIFO queue" for processing the changes. + """ + + __tablename__ = "webhook_change" + + uuid = model.uuid_column() + + entity_type = sa.Column(sa.String(length=100), nullable=False) + bundle = sa.Column(sa.String(length=100), nullable=False) + farmos_uuid = sa.Column(model.UUID(), nullable=False) + + deleted = sa.Column(sa.Boolean(), nullable=False) + + received = sa.Column( + sa.DateTime(), + nullable=False, + default=make_utc, + doc=""" + Date and time when the change was obtained from the watcher thread. + """, + ) + + def __str__(self): + event_type = "delete" if self.deleted else "create/update" + return f"{event_type} {self.entity_type}--{self.bundle}: {self.farmos_uuid}" diff --git a/src/wuttafarm/importing/farmos.py b/src/wuttafarm/importing/farmos.py index c739bad..4f5a47a 100644 --- a/src/wuttafarm/importing/farmos.py +++ b/src/wuttafarm/importing/farmos.py @@ -339,6 +339,8 @@ class AnimalAssetImporter(AssetImporterBase): model_class = model.AnimalAsset + animal_types_by_farmos_uuid = None + def get_supported_fields(self): fields = list(super().get_supported_fields()) fields.extend( @@ -361,6 +363,17 @@ class AnimalAssetImporter(AssetImporterBase): if animal_type.farmos_uuid: self.animal_types_by_farmos_uuid[animal_type.farmos_uuid] = animal_type + def get_animal_type_by_farmos_uuid(self, uuid): + if self.animal_types_by_farmos_uuid is not None: + return self.animal_types_by_farmos_uuid.get(uuid) + + model = self.app.model + return ( + self.target_session.query(model.AnimalType) + .filter(model.AnimalType.farmos_uuid == uuid) + .first() + ) + def normalize_source_object(self, animal): """ """ animal_type_uuid = None @@ -368,7 +381,7 @@ class AnimalAssetImporter(AssetImporterBase): if animal_type := relationships.get("animal_type"): if animal_type["data"]: - if wf_animal_type := self.animal_types_by_farmos_uuid.get( + if wf_animal_type := self.get_animal_type_by_farmos_uuid( UUID(animal_type["data"]["id"]) ): animal_type_uuid = wf_animal_type.uuid @@ -500,6 +513,8 @@ class EquipmentAssetImporter(AssetImporterBase): model_class = model.EquipmentAsset + equipment_types_by_farmos_uuid = None + def get_supported_fields(self): fields = list(super().get_supported_fields()) fields.extend( @@ -520,6 +535,17 @@ class EquipmentAssetImporter(AssetImporterBase): equipment_type ) + def get_equipment_type_by_farmos_uuid(self, uuid): + if self.equipment_types_by_farmos_uuid is not None: + return self.equipment_types_by_farmos_uuid.get(uuid) + + model = self.app.model + return ( + self.target_session.query(model.EquipmentType) + .filter_by(farmos_uuid=uuid) + .first() + ) + def normalize_source_object(self, equipment): """ """ data = super().normalize_source_object(equipment) @@ -530,7 +556,7 @@ class EquipmentAssetImporter(AssetImporterBase): if equipment_type := relationships.get("equipment_type"): equipment_types = [] for equipment_type in equipment_type["data"]: - if wf_equipment_type := self.equipment_types_by_farmos_uuid.get( + if wf_equipment_type := self.get_equipment_type_by_farmos_uuid( UUID(equipment_type["id"]) ): equipment_types.append(wf_equipment_type.uuid) @@ -632,6 +658,8 @@ class LandAssetImporter(AssetImporterBase): model_class = model.LandAsset + land_types_by_id = None + def get_supported_fields(self): fields = list(super().get_supported_fields()) fields.extend( @@ -650,10 +678,21 @@ class LandAssetImporter(AssetImporterBase): for land_type in self.target_session.query(model.LandType): self.land_types_by_id[land_type.drupal_id] = land_type + def get_land_type_by_id(self, drupal_id): + if self.land_types_by_id is not None: + return self.land_types_by_id.get(drupal_id) + + model = self.app.model + return ( + self.target_session.query(model.LandType) + .filter_by(drupal_id=drupal_id) + .first() + ) + def normalize_source_object(self, land): """ """ land_type_id = land["attributes"]["land_type"] - land_type = self.land_types_by_id.get(land_type_id) + land_type = self.get_land_type_by_id(land_type_id) if not land_type: log.warning( "invalid land_type '%s' for farmOS Land Asset: %s", land_type_id, land @@ -758,6 +797,9 @@ class PlantAssetImporter(AssetImporterBase): model_class = model.PlantAsset + plant_types_by_farmos_uuid = None + seasons_by_farmos_uuid = None + def get_supported_fields(self): fields = list(super().get_supported_fields()) fields.extend( @@ -782,6 +824,26 @@ class PlantAssetImporter(AssetImporterBase): if season.farmos_uuid: self.seasons_by_farmos_uuid[season.farmos_uuid] = season + def get_plant_type_by_farmos_uuid(self, uuid): + if self.plant_types_by_farmos_uuid is not None: + return self.plant_types_by_farmos_uuid.get(uuid) + + model = self.app.model + return ( + self.target_session.query(model.PlantType) + .filter_by(farmos_uuid=uuid) + .first() + ) + + def get_season_by_farmos_uuid(self, uuid): + if self.seasons_by_farmos_uuid is not None: + return self.seasons_by_farmos_uuid.get(uuid) + + model = self.app.model + return ( + self.target_session.query(model.Season).filter_by(farmos_uuid=uuid).first() + ) + def normalize_source_object(self, plant): """ """ data = super().normalize_source_object(plant) @@ -793,7 +855,7 @@ class PlantAssetImporter(AssetImporterBase): if plant_type := relationships.get("plant_type"): plant_types = [] for plant_type in plant_type["data"]: - if wf_plant_type := self.plant_types_by_farmos_uuid.get( + if wf_plant_type := self.get_plant_type_by_farmos_uuid( UUID(plant_type["id"]) ): plant_types.append(wf_plant_type.uuid) @@ -803,7 +865,7 @@ class PlantAssetImporter(AssetImporterBase): if season := relationships.get("season"): seasons = [] for season in season["data"]: - if wf_season := self.seasons_by_farmos_uuid.get(UUID(season["id"])): + if wf_season := self.get_season_by_farmos_uuid(UUID(season["id"])): seasons.append(wf_season.uuid) else: log.warning("season not found: %s", season["id"]) @@ -886,6 +948,8 @@ class StructureAssetImporter(AssetImporterBase): model_class = model.StructureAsset + structure_types_by_id = None + def get_supported_fields(self): fields = list(super().get_supported_fields()) fields.extend( @@ -903,10 +967,21 @@ class StructureAssetImporter(AssetImporterBase): for structure_type in self.target_session.query(model.StructureType): self.structure_types_by_id[structure_type.drupal_id] = structure_type + def get_structure_type_by_id(self, drupal_id): + if self.structure_types_by_id is not None: + return self.structure_types_by_id.get(drupal_id) + + model = self.app.model + return ( + self.target_session.query(model.StructureType) + .filter_by(drupal_id=drupal_id) + .first() + ) + def normalize_source_object(self, structure): """ """ structure_type_id = structure["attributes"]["structure_type"] - structure_type = self.structure_types_by_id.get(structure_type_id) + structure_type = self.get_structure_type_by_id(structure_type_id) if not structure_type: log.warning( "invalid structure_type '%s' for farmOS Structure Asset: %s", diff --git a/src/wuttafarm/web/templates/appinfo/configure.mako b/src/wuttafarm/web/templates/appinfo/configure.mako index 912eef0..26d6a54 100644 --- a/src/wuttafarm/web/templates/appinfo/configure.mako +++ b/src/wuttafarm/web/templates/appinfo/configure.mako @@ -57,6 +57,11 @@ + + + + . +# +################################################################################ +""" +Views for use as webhooks +""" + +import logging +from uuid import UUID + +from wuttaweb.views import View +from wuttaweb.db import Session + + +log = logging.getLogger(__name__) + + +class WebhookView(View): + """ + Webhook views + """ + + def farmos_webhook(self): + model = self.app.model + session = Session() + + try: + data = self.request.json + log.debug("got webhook payload: %s", data) + + _, entity_type, event_type = data["event"].split(":") + + uuid = data["entity"]["uuid"][0]["value"] + if entity_type == "taxonomy_term": + bundle = data["entity"]["vid"][0]["target_id"] + else: + bundle = data["entity"]["type"][0]["target_id"] + + change = model.WebhookChange( + entity_type=entity_type, + bundle=bundle, + farmos_uuid=UUID(uuid), + deleted=event_type == "delete", + ) + session.add(change) + + except: + log.exception("failed to process webhook request") + + return {} + + @classmethod + def defaults(cls, config): + cls._webhook_defaults(config) + + @classmethod + def _webhook_defaults(cls, config): + + # farmos webhook + config.add_route("webhooks.farmos", "/farmos/webhook", request_method="POST") + config.add_view( + cls, + attr="farmos_webhook", + route_name="webhooks.farmos", + require_csrf=False, + renderer="json", + ) + + +def defaults(config, **kwargs): + base = globals() + + WebhookView = kwargs.get("WebhookView", base["WebhookView"]) + WebhookView.defaults(config) + + +def includeme(config): + defaults(config)