Source code for fetchez.recipe

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
fetchez.recipe
~~~~~~~~~~~~~~
The Workflow Engine.
Loads a configuration (The Recipe) and executes it against the target region.

:copyright: (c) 2010-2026 Regents of the University of Colorado
:license: MIT, see LICENSE for more details.
"""

import os
import json
import logging

from .core import run_fetchez
from .spatial import parse_region
from .registry import (
    ModuleRegistry,
    HookRegistry,
    SchemaRegistry,
    PresetRegistry,
    BundleRegistry,
)
from .utils import TqdmLoggingHandler
from . import __version__ as fetchez_version

logger = logging.getLogger(__name__)


# This is duplicated in fetchez.cli
# We should move this to utils
[docs] def setup_logging(verbose=False): log_level = logging.INFO if verbose else logging.WARNING logger = logging.getLogger() logger.setLevel(log_level) if logger.hasHandlers(): logger.handlers.clear() handler = TqdmLoggingHandler() formatter = logging.Formatter("[ %(levelname)s ] %(module)s: %(message)s") handler.setFormatter(formatter) logger.addHandler(handler)
def _parse_version(v_str): """Dependency-free semantic version parser. Converts '2.1.0-beta' into (2, 1, 0). """ parts = [] for p in v_str.split("."): num = "".join(filter(str.isdigit, p)) parts.append(int(num) if num else 0) return tuple(parts)
[docs] class Recipe: """The Workflow Orchestrator. Reads data ingestion and processing recipes from YAML/JSON files and executes them. Usage: # Load the Recipe recipe = Recipe.from_file("socal_project.yaml") # Run it. recipe.run() """
[docs] def __init__(self, config, base_dir=None): self.config = config self.base_dir = base_dir or os.getcwd() self.name = self.config.get("project", {}).get("name", "Unnamed_Recipe") setup_logging(True)
[docs] @classmethod def from_file(cls, config_source): """Factory method to load the Recipe. Accepts a filename (str) or a dictionary directly. """ if isinstance(config_source, dict): return cls(config_source) if not os.path.exists(config_source): raise FileNotFoundError(f"Recipe not found: {config_source}") base_dir = os.path.dirname(os.path.abspath(config_source)) ext = os.path.splitext(config_source)[1].lower() with open(config_source, "r") as f: if ext in [".yaml", ".yml"]: import yaml config = yaml.safe_load(f) else: config = json.load(f) return cls(config, base_dir=base_dir)
from_dict = from_file def _check_integrity(self): """Ensures the fetchez version meets the recipe's minimum requirements.""" conf = self.config.get("config", {}) min_fz = conf.get("min_fetchez_version") if min_fz: current = _parse_version(fetchez_version) required = _parse_version(min_fz) if current < required: logger.error( f"Recipe requires fetchez v{min_fz}, but found v{fetchez_version}" ) raise RuntimeError("Fetchez version incompatibility.") def _resolve_path(self, path): """Resolves output paths relative to the recipe file.""" if not isinstance(path, str): return path if path.startswith(("http", "s3://", "gs://", "ftp://")): return path if os.path.isabs(path): return path return os.path.abspath(os.path.join(self.base_dir, path)) def _init_hooks(self, hook_defs, mod=None): if not hook_defs: return [] HookRegistry.load_all() PresetRegistry.load_all() active_hooks = [] for h in hook_defs: name = h.get("name") is_preset = h.get("preset") raw_kwargs = h.get("args", {}) kwargs = {} for k, v in raw_kwargs.items(): if k in [ "file", "output", "output_grid", "mask_fn", "dem", "barrier", "aux_path", "path", ]: kwargs[k] = self._resolve_path(v) else: kwargs[k] = v # --- PRESET EXPANSION --- if is_preset: preset_def = PresetRegistry.get_yaml(is_preset) if preset_def: import copy preset_hooks = copy.deepcopy(preset_def.get("hooks", [])) # --- ARGUMENT INJECTION --- for inner_hook in preset_hooks: h_name = inner_hook.get("name") # If the user passed a dictionary of args specifically for this hook if h_name in kwargs and isinstance(kwargs[h_name], dict): inner_hook.setdefault("args", {}).update(kwargs[h_name]) # Recursively parse the expanded hooks expanded_hooks = self._init_hooks(preset_hooks, mod=mod) active_hooks.extend(expanded_hooks) else: logger.error(f"Preset '{is_preset}' not found in registry.") # --- STANDARD HOOK INITIALIZATION --- else: HookCls = HookRegistry.get_class(name) if HookCls: import inspect sig = inspect.signature(HookCls.__init__) valid_kwargs = { k: v for k, v in kwargs.items() if k in sig.parameters or "kwargs" in str(sig.parameters) } active_hooks.append(HookCls(**valid_kwargs)) else: logger.warning(f"Hook '{name}' missing.") return active_hooks
[docs] def run(self): """Execute the recipe!""" ModuleRegistry.load_all() BundleRegistry.load_all() SchemaRegistry.load_all() if not self.config: return self.config = SchemaRegistry.apply_schema(self.config) self._check_integrity() logger.debug(f"Preparing to execute recipe: {self.name}") run_opts = self.config.get("execution", {}) threads = run_opts.get("threads", 1) global_hooks = self._init_hooks(self.config.get("global_hooks", [])) global_region_def = self.config.get("region") global_regions = ( parse_region(global_region_def) if global_region_def else [None] ) expanded_modules = [] for mod_dict in self.config.get("modules", []): if "bundle" in mod_dict: bundle_name = mod_dict["bundle"] user_args = mod_dict.get("args", {}) user_hooks = mod_dict.get("hooks", []) # Fetch the curated package from the registry bundle_def = BundleRegistry.get_yaml(bundle_name) if not bundle_def: logger.error(f"Bundle '{bundle_name}' not found!") continue weight_multiplier = float(user_args.get("weight", 1.0)) # Inject the bundle's modules into the main recipe for pkg_mod in bundle_def.get("modules", []): if "weight" in pkg_mod.setdefault("args", {}): original_weight = float(pkg_mod["args"]["weight"]) pkg_mod["args"]["weight"] = original_weight * weight_multiplier if user_hooks: pkg_mod.setdefault("hooks", []).extend(user_hooks) expanded_modules.append(pkg_mod) else: expanded_modules.append(mod_dict) self.config["modules"] = expanded_modules modules_to_run = [] for mod_def in self.config.get("modules", []): if isinstance(mod_def, str): mod_key, mod_args, mod_hooks, mod_region_def = mod_def, {}, [], None else: mod_key = mod_def.get("module") mod_args = mod_def.get("args", {}) mod_hooks = self._init_hooks(mod_def.get("hooks", []), mod=mod_key) mod_region_def = mod_def.get("region") mod_regions = ( parse_region(mod_region_def) if mod_region_def else global_regions ) # Maybe we don't need to enforce this here... # if not mod_regions or mod_regions == [None]: # logger.warning(f"Module '{mod_key}' has no target region. Skipping.") # continue ModCls = ModuleRegistry.get_class(mod_key) if not ModCls: logger.error(f"Unknown module: {mod_key}") continue for region in mod_regions: if "path" in mod_args: mod_args["path"] = self._resolve_path(mod_args["path"]) try: instance = ModCls(src_region=region, hook=mod_hooks, **mod_args) modules_to_run.append(instance) except Exception as e: logger.error(f"Failed to load {mod_key}: {e}") if not modules_to_run: logger.warning("Recipe empty. Nothing to execute.") return logger.debug(f"Queued {len(modules_to_run)} module queries. Searching...") for mod in modules_to_run: try: mod.run() except Exception as e: logger.error( f"Module '{mod.name}' failed to generate URLs (Skipping): {e}" ) run_fetchez(modules_to_run, threads=threads, global_hooks=global_hooks) logger.debug(f"Recipe complete: {self.name}")
# self._generate_receipt() def _generate_receipt(self): import datetime name = self.name desc = self.config.get("project", {}).get( "description", "No description provided." ) region = self.config.get("region", "Global") receipt_filename = f"{name.lower().replace(' ', '_')}_receipt.md" receipt_path = os.path.join(self.base_dir, receipt_filename) timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Handle the case where region is None (which we just enabled!) region_str = ( f"`{region}`" if region and str(region) != "None" else "`Global / Not Specified`" ) # Parse Modules for the receipt modules_info = [] mod_counts = {} for mod in self.config.get("modules", []): m_name = mod.get("module", "Unknown") args = mod.get("args", {}) weight = args.get("weight", 1.0) datatype = args.get("datatype", "default") modules_info.append( f"- **{m_name}** (Weight: {weight}, Datatype: {datatype})" ) mod_counts[m_name] = mod_counts.get(m_name, 0) + 1 # Parse Global Hooks for the receipt hooks_info = [] expected_outputs = [] for hook in self.config.get("global_hooks", []): h_name = hook.get("name", "Unknown") hooks_info.append(f"1. `{h_name}`") args = hook.get("args", {}) for key in args: if "output" in key: expected_outputs.append(args[key]) with open(receipt_path, "w", encoding="utf-8") as f: f.write(f"# Pipeline Execution Receipt: {name}\n") f.write(f"**Generated by Fetchez v{fetchez_version}**\n") f.write(f"*Execution Time: {timestamp}*\n\n") f.write("## Project Description\n") f.write(f"{desc}\n\n") f.write("## Target Region\n") f.write(f"{region_str}\n\n") f.write("## Data Modules Executed\n") if modules_info: f.write("\n".join(modules_info) + "\n\n") else: f.write("*None*\n\n") f.write("## Processing Pipeline (Hooks)\n") if hooks_info: f.write("\n".join(hooks_info) + "\n") else: f.write("*None*\n") # --- Print Terminal Summary --- logger.info("=" * 67) logger.info(f"✅ PIPELINE COMPLETE: {name}") logger.info("=" * 67) short_desc = desc[:75] + "..." if len(desc) > 75 else desc logger.info(f"Description: {short_desc}") if region and str(region) != "None": logger.info(f"Region: {region}") sources_str = ", ".join( [f"{k} (x{v})" if v > 1 else k for k, v in mod_counts.items()] ) logger.info(f"Sources: {sources_str}") logger.info(f"💾 OUTPUTS SAVED TO: {self.base_dir}") for out in expected_outputs: logger.info(f" ➔ {out}") logger.info(f"📄 Full processing receipt saved to: {receipt_filename}") logger.info("=" * 67)
[docs] def validate(self): """Validates the recipe for syntax, missing plugins, dependencies, and logical errors. Returns: bool: True if valid, False if errors exist. list: List of error messages. """ ModuleRegistry.load_all() HookRegistry.load_all() errors = [] claimed_outputs = set() def check_output_collision(hook_dict, context_name): """Helper to check if a hook is clobbering an existing file.""" out_file = hook_dict.get("args", {}).get("output") if out_file: if out_file in claimed_outputs: errors.append( f"[{context_name}] Output Collision: Multiple hooks are attempting to write to '{out_file}'." ) claimed_outputs.add(out_file) # Validate Modules for mod in self.config.get("modules", []): mod_name = mod.get("module") mod_keys = mod.keys() valid_keys = [ "module", "bundle", "hooks", "args", "region", "description", "_comment", ] for key in mod_keys: if key not in valid_keys: errors.append( f"Module `{mod_name}` has unexpected reference to `{key}`" ) if not ModuleRegistry.get_class(mod_name) and mod_name not in [ "file", "local_fs", ]: errors.append(f"Missing Module: '{mod_name}'") # Check Module-level Hooks # mod_hook_counts = {} for hook in mod.get("hooks", []): h_name = hook.get("name") HookCls = HookRegistry.get_class(h_name) if not HookCls: errors.append(f"Missing Hook: '{h_name}' (in module {mod_name})") continue # Dependency Check if hasattr(HookCls, "_validate_deps"): passed, msg = HookCls()._validate_deps() if not passed: errors.append( f"[{mod_name} -> {h_name}] Missing Dependency: {msg}" ) check_output_collision(hook, f"Module: {mod_name}") # Validate Global Hooks # global_hook_counts = {} for hook in self.config.get("global_hooks", []): h_name = hook.get("name") HookCls = HookRegistry.get_class(h_name) if not HookCls: errors.append(f"Missing Global Hook: '{h_name}'") continue # Dependency Check if hasattr(HookCls, "_validate_deps"): passed, msg = HookCls()._validate_deps() if not passed: errors.append(f"[Global -> {h_name}] Missing Dependency: {msg}") check_output_collision(hook, "Global Hooks") return len(errors) == 0, errors