#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
fetchez.fred
~~~~~~~~~~~~~
Fetches Remote Elevation Datalist (FRED)
Handles the indexing, storage, and spatial querying of remote datasets
that lack a public API but provide file lists (e.g., NCEI Thredds, USACE).
:copyright: (c) 2010 - 2026 Regents of the University of Colorado
:license: MIT, see LICENSE for more details.
"""
import os
import json
import logging
from typing import List, Dict, Optional, Any, Tuple
from . import utils
from . import config
from . import spatial
try:
from shapely.geometry import shape
# from shapely.strtree import STRtree
HAS_SHAPELY = True
except ImportError:
HAS_SHAPELY = False
logger = logging.getLogger(__name__)
# Directory where FRED index files are stored
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
FETCH_DATA_DIR = os.path.join(THIS_DIR, "data")
[docs]
class FRED:
"""FRED (Fetches Remote Elevation Datalist) manages a local GeoJSON-based index
of remote files. It allows spatial queries to determine which files to download.
"""
# Standard metadata schema
SCHEMA = [
"Name",
"ID",
"Date",
"Agency",
"MetadataLink",
"MetadataDate",
"DataLink",
"IndexLink",
"Link",
"DataType",
"DataSource",
"Resolution",
"HorizontalDatum",
"VerticalDatum",
"LastUpdate",
"Etcetra",
"Info",
]
[docs]
def __init__(self, name: str = "FRED", local: bool = False):
self.name = name
self.filename = f"{name}.geojson"
# Default to local directory if not found in data dir
if local:
self.path = self.filename
elif os.path.exists(os.path.join(FETCH_DATA_DIR, self.filename)):
self.path = os.path.join(FETCH_DATA_DIR, self.filename)
elif os.path.exists(os.path.join(config.CONFIG_PATH, "indices", self.filename)):
self.path = os.path.join(config.CONFIG_PATH, "indices", self.filename)
else:
self.path = self.filename
self.features: List[Any] = []
self._load()
def _load(self):
"""Load the GeoJSON file into memory."""
if os.path.exists(self.path):
try:
with open(self.path, "r", encoding="utf-8") as f:
data = json.load(f)
self.features = data.get("features", [])
msg = (
f"Loaded index {utils.colorize(self.name, utils.CYAN)} "
f"from {utils.str_truncate_middle(self.path)} "
f"({utils.colorize(str(len(self.features)), utils.BOLD)} items)"
)
logger.debug(msg)
except (json.JSONDecodeError, IOError) as e:
logger.error(f"Corrupt or unreadable index at {self.path}: {e}")
self.features = []
else:
logger.debug(f"Index not found at {self.path}, starting empty.")
logger.info(
f"Initializing new index for {utils.colorize(self.name, utils.CYAN)}"
)
self.features = []
[docs]
def save(self):
"""Save the current features to the GeoJSON file."""
data = {
"type": "FeatureCollection",
"name": self.name,
"features": self.features,
}
# Ensure directory exists
out_dir = os.path.dirname(self.path)
if out_dir and not os.path.exists(out_dir):
os.makedirs(out_dir)
try:
with open(self.path, "w", encoding="utf-8") as f:
json.dump(data, f, separators=(",", ":")) # Compact JSON
logger.info(f"Saved {len(self.features)} items to {self.name} index.")
except IOError as e:
logger.error(f"Failed to save FRED index {self.path}: {e}")
[docs]
def add_survey(self, geom: Dict, **kwargs):
"""Add a single survey entry to the FRED database.
Args:
geom (Dict): GeoJSON geometry dictionary (e.g., {'type': 'Polygon', 'coordinates': ...})
**kwargs: Attributes matching the schema.
"""
props = kwargs.copy()
props["LastUpdate"] = utils.this_date()
# for field in self.SCHEMA:
# if field not in props:
# props[field] = None
feature = {"type": "Feature", "properties": props, "geometry": geom}
self.features.append(feature)
[docs]
def search(
self,
region: Optional[Tuple[float, float, float, float]] = None,
where: List[str] = [],
layer: Optional[str] = None,
) -> List[Dict]:
"""Search for data in the reference vector file.
Args:
region: Tuple (xmin, xmax, ymin, ymax) from spatial.parse_region
where: List of simple SQL-style filters (e.g. "Agency = 'NOAA'")
(Currently supports simple equality checks for simplicity without SQL parser)
layer: Filter by 'DataSource' field (e.g., 'ncei_thredds')
Returns:
List of dictionaries containing the properties of matching features.
"""
results = []
search_geom = None
if region is not None and spatial.region_valid_p(region):
if HAS_SHAPELY:
search_geom = spatial.region_to_shapely(region)
else:
search_geom = None # TODO: update to manually make one from region!
if region:
r_str = ",".join(f"{x:.2f}" for x in region)
logger.debug(f"Searching {self.name} in region [{r_str}]...")
for feat in self.features:
props = feat.get("properties", {})
geom = feat.get("geometry")
if layer and props.get("DataSource") != layer:
continue
# Filter by Attributes ("where")
match = True
for clause in where:
if "=" in clause:
k, v = [x.strip().strip("'").strip('"') for x in clause.split("=")]
val = props.get(k)
if str(val) != v:
match = False
break
if not match:
continue
if search_geom and geom:
if HAS_SHAPELY:
try:
feat_shape = shape(geom)
if not search_geom.intersects(feat_shape):
continue
except Exception:
continue
else:
# TODO: Basic bounding box check (if Shapely missing)
pass
results.append(props)
logger.debug(f"FRED Search found {len(results)} items.")
return results
def _get_unique_values(self, field: str) -> List[Any]:
"""Helper to see unique values for a field (e.g. Agency)."""
values = set()
for f in self.features:
val = f.get("properties", {}).get(field)
if val:
values.add(val)
return list(values)
def _detect_spatial_fields(
self, row: Dict
) -> Tuple[Optional[float], Optional[float], Optional[float], Optional[float]]:
"""Attempt to find W/E/S/N in a dictionary using common abbreviations."""
keys_w = ["w", "west", "xmin", "min_lon", "min_x", "left"]
keys_e = ["e", "east", "xmax", "max_lon", "max_x", "right"]
keys_s = ["s", "south", "ymin", "min_lat", "min_y", "bottom"]
keys_n = ["n", "north", "ymax", "max_lat", "max_y", "top"]
def get_val(keys):
for k in keys:
# Try exact match
if k in row:
return float(row[k])
# Try Case-Insensitive
for rk in row.keys():
if rk.lower() == k:
return float(row[rk])
return None
return get_val(keys_w), get_val(keys_e), get_val(keys_s), get_val(keys_n)
[docs]
def ingest(
self,
source_file: str,
field_map: Optional[Dict[str, str]] = None,
wipe: bool = False,
):
"""Ingest a file listing (CSV or JSON) into the FRED index.
Args:
source_file: Path to the CSV or JSON file.
field_map: Dictionary mapping Input_Header -> FRED_Field.
Example: {'file_url': 'DataLink', 'file_name': 'Name'}
wipe: If True, clears existing index before ingesting.
"""
import csv
if not os.path.exists(source_file):
logger.error(f"Source file not found: {source_file}")
return
if wipe:
self.features = []
field_map = field_map or {}
ext = source_file.split(".")[-1].lower()
items = []
try:
if ext == "csv":
with open(source_file, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
items = list(reader)
elif ext == "json":
with open(source_file, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
items = data
elif "files" in data:
items = data["files"]
elif "items" in data:
items = data["items"]
else:
logger.error("Unsupported file format. Use CSV or JSON.")
return
except Exception as exception:
logger.error(f"Failed to read source file: {exception}")
return
logger.info(f"Ingesting {len(items)} items from {source_file}...")
added = 0
for item in items:
props = {}
for field in self.SCHEMA:
if field in item:
props[field] = item[field]
elif field.lower() in item:
props[field] = item[field.lower()]
for src_k, dst_k in field_map.items():
if src_k in item:
props[dst_k] = item[src_k]
if "DataLink" not in props:
for k, v in item.items():
if "url" in k.lower() or "link" in k.lower() or "path" in k.lower():
props["DataLink"] = v
break
if "DataLink" not in props:
logger.warning(f"Skipping item {item}: No DataLink/URL found.")
continue
link = props.get("DataLink")
if link and not link.startswith("http") and not link.startswith("ftp"):
abs_path = os.path.abspath(link)
props["DataLink"] = f"file://{abs_path}"
w, e, s, n = self._detect_spatial_fields(item)
if None in [w, e, s, n]:
logger.warning(
f"Skipping item {props.get('Name')}: Missing spatial bounds."
)
continue
# Create GeoJSON Polygon
# Counter-clockwise: SW -> SE -> NE -> NW -> SW
geom = {
"type": "Polygon",
"coordinates": [[[w, s], [e, s], [e, n], [w, n], [w, s]]],
}
self.add_survey(geom, **props)
added += 1
logger.info(f"Successfully added {added} surveys to {self.name}.")
self.save()