Source code for fetchez.core

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
fetchez.core
~~~~~~~~~~~~~

This module is the core of the Fetchez library.
It handles the initialization of fetchers, connection pooling,
threading, and the base FetchModule class.

:copyright: (c) 2010-2026 Regents of the University of Colorado
:license: MIT, see LICENSE for more details.
"""

import os
import time
import base64
import threading
import netrc
import io
import logging
import collections
from tqdm import tqdm
import urllib.parse
from urllib.error import HTTPError
from urllib.request import Request, build_opener, HTTPCookieProcessor
from typing import List, Dict, Optional, Any, Tuple
import concurrent.futures

import requests
import lxml.etree
import lxml.html as lh

try:
    from shapely.geometry import Polygon, mapping

    HAS_SHAPELY = True
except ImportError:
    HAS_SHAPELY = False

from . import utils
from . import spatial
from . import __version__

STOP_EVENT = threading.Event()

CUDEM_USER_AGENT = f"Fetchez/{__version__}"
DEFAULT_USER_AGENT = (
    "Mozilla/5.0 (X11; Linux x86_64; rv:146.0) Gecko/20100101 Firefox/146.0"
)
R_HEADERS = {"User-Agent": DEFAULT_USER_AGENT}

NAMESPACES = {
    "gmd": "http://www.isotc211.org/2005/gmd",
    "gmi": "http://www.isotc211.org/2005/gmi",
    "gco": "http://www.isotc211.org/2005/gco",
    "gml": "http://www.isotc211.org/2005/gml",
    "th": "http://www.unidata.ucar.edu/namespaces/thredds/InvCatalog/v1.0",
    "wms": "http://www.opengis.net/wms",
}

logger = logging.getLogger(__name__)

HOOK_LOCK = threading.Lock()


# =============================================================================
# Helper Functions
# =============================================================================
[docs] def fetches_callback(r: List[Any]): """Default callback for fetches processes. r: [url, local-fn, data-type, fetch-status-or-error-code] """ pass
[docs] def urlencode_(opts: Dict) -> str: """Encode `opts` for use in a URL.""" return urllib.parse.urlencode(opts)
[docs] def urlencode(opts: Dict, doseq: bool = True) -> str: """Encode `opts` for use in a URL. Args: opts: Dictionary of query parameters. doseq: If True, lists in values are encoded as separate parameters (e.g., {'a': [1, 2]} -> 'a=1&a=2'). """ return urllib.parse.urlencode(opts, doseq=doseq)
[docs] def xml2py(node) -> Optional[Dict]: """Parse an xml file into a python dictionary.""" texts: Dict[Any, Any] = {} if node is None: return None for child in list(node): child_key = lxml.etree.QName(child).localname if "name" in child.attrib: child_key = child.attrib["name"] href = child.attrib.get("{http://www.w3.org/1999/xlink}href") if child.text is None or child.text.strip() == "": if href is not None: if child_key in texts: texts[child_key].append(href) else: texts[child_key] = [href] else: if child_key in texts: ck = xml2py(child) if ck: first_key = list(ck.keys())[0] texts[child_key][first_key].update(ck[first_key]) else: texts[child_key] = xml2py(child) else: if child_key in texts: texts[child_key].append(child.text) else: texts[child_key] = [child.text] return texts
[docs] def get_userpass(authenticator_url: str) -> Tuple[Optional[str], Optional[str]]: """Retrieve username and password from netrc for a given URL.""" username = None password = None try: info = netrc.netrc() host_auth = urllib.parse.urlparse(authenticator_url).hostname if host_auth is not None: auth_results = info.authenticators(host_auth) if auth_results is not None: username, _, password = auth_results except Exception as e: if "No such file" not in str(e): logger.error(f"Failed to parse netrc: {e}") username = None password = None return username, password
[docs] def get_credentials( url: str, authenticator_url: str = "https://urs.earthdata.nasa.gov" ) -> Optional[str]: """Get user credentials from .netrc or prompt for input. Used for EarthData, etc. """ credentials = None errprefix = "" username, password = get_userpass(authenticator_url) while not credentials: if not username: username = utils.get_username() password = utils.get_password() cred_str = f"{username}:{password}" credentials = base64.b64encode(cred_str.encode("ascii")).decode("ascii") if url: try: req = Request(url) req.add_header("Authorization", f"Basic {credentials}") opener = build_opener(HTTPCookieProcessor()) opener.open(req) except HTTPError: logger.error(f"{errprefix}Incorrect username or password") errprefix = "" credentials = None username = None password = None return credentials
# ============================================================================= # XML / ISO Metadata Helper # =============================================================================
[docs] class iso_xml: """Helper class for parsing ISO 19115 XML Metadata."""
[docs] def __init__(self, url=None, xml=None, timeout=20, read_timeout=60): self.url = url self.xml_doc = None self.namespaces = { "gmd": "http://www.isotc211.org/2005/gmd", "gco": "http://www.isotc211.org/2005/gco", "gml": "http://www.opengis.net/gml", "gml32": "http://www.opengis.net/gml/3.2", "xlink": "http://www.w3.org/1999/xlink", "gmi": "http://www.isotc211.org/2005/gmi", } if self.url is not None: req = Fetch(self.url).fetch_req(timeout=timeout, read_timeout=read_timeout) if req and req.status_code == 200: self._parse(req.content) elif xml is not None: self._parse(xml)
def _parse(self, content): try: # Use recover=True to handle slight XML errors parser = lxml.etree.XMLParser(recover=True) self.xml_doc = lxml.etree.fromstring(content, parser=parser) except Exception as e: logger.error(f"XML Parsing failed: {e}") self.xml_doc = None def _xpath_get(self, xpath_str): """Helper to safely get first text result of xpath.""" if self.xml_doc is None: return None try: res = self.xml_doc.xpath(xpath_str, namespaces=self.namespaces) if res: if isinstance(res[0], str): return str(res[0]).strip() if hasattr(res[0], "text"): return str(res[0].text).strip() return None except Exception: return None
[docs] def title(self): """Extract Title.""" return self._xpath_get( ".//gmd:identificationInfo//gmd:citation//gmd:title/gco:CharacterString" )
[docs] def abstract(self): """Extract Abstract.""" return self._xpath_get( ".//gmd:identificationInfo//gmd:abstract/gco:CharacterString" )
[docs] def date(self): """Extract Date.""" d = self._xpath_get(".//gmd:date/gco:Date") if not d: d = self._xpath_get(".//gmd:date/gco:DateTime") return d
[docs] def linkages(self): """Extract first valid download URL (specifically looking for Zips/Data).""" if self.xml_doc is None: return None try: urls = self.xml_doc.xpath( ".//gmd:distributionInfo//gmd:URL/text() | .//gmd:distributionInfo//gmd:linkage/gco:CharacterString/text()", namespaces=self.namespaces, ) for u in urls: u = u.strip() # we want zip files (actual data) over metadata links if ".zip" in u.lower(): return u # return first URL if no zip found if urls: return urls[0].strip() except Exception: pass return None
[docs] def polygon(self, geom=True): """Extract Bounding Box and return GeoJSON Polygon.""" if self.xml_doc is None: return None out_poly = [] try: # Find Bounding Box # bbox = self.xml_doc.xpath('.//gmd:EX_GeographicBoundingBox', namespaces=self.namespaces) bbox = self.xml_doc.find(".//{*}Polygon", namespaces=self.namespaces) if not bbox: return None nodes = bbox.findall(".//{*}pos", namespaces=self.namespaces) for node in nodes: out_poly.append([float(x) for x in node.text.split()]) ## Close polygon if out_poly and ( out_poly[0][0] != out_poly[-1][0] or out_poly[0][1] != out_poly[-1][1] ): out_poly.append(out_poly[0]) out_poly = [[lon, lat] for lat, lon in out_poly] if geom: if HAS_SHAPELY: poly = Polygon(out_poly) geojson_dict = mapping(poly) else: geojson_dict = {"type": "Polygon", "coordinates": [out_poly]} return geojson_dict else: return out_poly except (IndexError, ValueError): logger.error("Could not parse polygon from xml") return None
[docs] class HttpFile(io.IOBase): """A file-like object backed by an HTTP URL. Translates read() calls into HTTP Range requests to fetch only needed bytes. """
[docs] def __init__(self, url, session=None, callback=None): self.url = url self.session = session or requests.Session() self.callback = callback self.offset = 0 self.size = self._get_size()
def _get_size(self): resp = self.session.head(self.url) if "Content-Length" not in resp.headers: return 0 return int(resp.headers["Content-Length"])
[docs] def seek(self, offset, whence=io.SEEK_SET): if whence == io.SEEK_SET: self.offset = offset elif whence == io.SEEK_CUR: self.offset += offset elif whence == io.SEEK_END: self.offset = self.size + offset return self.offset
[docs] def tell(self): return self.offset
[docs] def read(self, size=-1): if size == -1: end = self.size - 1 else: end = self.offset + size - 1 if end >= self.size: end = self.size - 1 if self.offset > end: return b"" # Fetch ONLY the specific bytes requested headers = {"Range": f"bytes={self.offset}-{end}"} response = self.session.get(self.url, headers=headers, timeout=(10, 60)) response.raise_for_status() data = response.content if self.callback: self.callback(len(data)) self.offset += len(data) return data
# ============================================================================= # Fetch # =============================================================================
[docs] class Fetch: """Fetch class to fetch ftp/http data files"""
[docs] def __init__( self, url: str, callback=fetches_callback, headers: Dict = R_HEADERS, verify: bool = True, allow_redirects: bool = True, ): self.url = url self.callback = callback self.headers = headers self.verify = verify self.allow_redirects = allow_redirects self.silent = logger.getEffectiveLevel() > logging.INFO
[docs] def fetch_req( self, method: str = "GET", params: Optional[Dict] = None, data: Optional[Any] = None, json: Optional[Dict] = None, tries: int = 5, # timeout: Optional[Union[float, Tuple]] = None, timeout: Optional[float] = 30, read_timeout: Optional[float] = 120, ) -> Optional[requests.Response]: """Fetch src_url and return the requests object (iterative retry).""" req = None current_timeout = timeout current_read_timeout = read_timeout for attempt in range(tries): try: # Calculate timeouts for this attempt tupled_timeout = ( current_timeout if current_timeout else None, current_read_timeout if current_read_timeout else None, ) req = requests.request( method=method, url=self.url, params=params, data=data, json=json, headers=self.headers, # auth=self.auth, timeout=tupled_timeout, verify=self.verify, allow_redirects=self.allow_redirects, stream=True, # Always stream to support large files ) # Check status codes if req.status_code == 504: # Gateway Timeout time.sleep(2) ## Increase timeouts next loop if current_timeout: current_timeout += 1 if current_read_timeout: current_read_timeout += 10 continue elif req.status_code == 416: # Range Not Satisfiable # If range fails, try fetching whole file if "Range" in self.headers: del self.headers["Range"] continue elif 200 <= req.status_code <= 299: return req else: logger.error(f"Request from {req.url} returned {req.status_code}") return req except Exception as e: logger.debug(f"Attempt {attempt + 1}/{tries} failed: {e}") if current_timeout: current_timeout *= 2 if current_read_timeout: current_read_timeout *= 2 time.sleep(1) logger.error(f"Connection failed after {tries} attempts: {self.url}") raise ConnectionError("Maximum attempts at connecting have failed.")
[docs] def fetch_html(self, timeout=2): """Fetch src_url and return it as an HTML object.""" req = self.fetch_req(timeout=timeout) if req: return lh.document_fromstring(req.text) return None
[docs] def fetch_xml(self, timeout=2, read_timeout=10): """Fetch src_url and return it as an XML object.""" try: req = self.fetch_req(timeout=timeout, read_timeout=read_timeout) results = lxml.etree.fromstring(req.text.encode("utf-8")) except Exception: ## Fallback empty XML results = lxml.etree.fromstring( '<?xml version="1.0"?><!DOCTYPE _[<!ELEMENT _ EMPTY>]><_/>'.encode( "utf-8" ) ) return results
[docs] def fetch_file( self, dst_fn: str, method="GET", params=None, datatype=None, overwrite=False, timeout=30, read_timeout=120, tries=5, check_size=True, verbose=True, ) -> int: """Fetch src_url and save to dst_fn with resume support.""" # check if input `url` is a file path. Either check if it exists and move on or # copy it to the destination directory. if self.url and self.url.startswith("file://"): src_path = self.url[7:] # Strip 'file://' # Source == Destination # Just index/verify the file, not move it. if os.path.abspath(src_path) == os.path.abspath(dst_fn): if os.path.exists(src_path): if verbose: logger.debug(f"Verified local: {src_path}") return 0 else: logger.error(f"Missing local file: {src_path}") return -1 # Copy from Network/Local -> Output Dir else: try: import shutil if not os.path.exists(os.path.dirname(dst_fn)): os.makedirs(os.path.dirname(dst_fn)) shutil.copy2(src_path, dst_fn) return 0 except Exception as e: logger.error(f"Local copy failed: {e}") return -1 # Regular file fetching here-on-out dst_dir = os.path.abspath(os.path.dirname(dst_fn)) if not os.path.exists(dst_dir): try: os.makedirs(dst_dir) except OSError: pass part_fn = f"{dst_fn}.part" if not overwrite and os.path.exists(dst_fn): if not check_size: return 0 # Exists if os.path.getsize(dst_fn) > 0: return 0 # Exists for attempt in range(tries): resume_byte_pos = 0 mode = "wb" # Resume if partial file exists if os.path.exists(part_fn): resume_byte_pos = os.path.getsize(part_fn) if resume_byte_pos > 0: self.headers["Range"] = f"bytes={resume_byte_pos}-" mode = "ab" try: with requests.request( method=method, url=self.url, stream=True, params=params, # data=data, # json=json, headers=self.headers, timeout=(timeout, read_timeout), verify=self.verify, allow_redirects=self.allow_redirects, ) as req: # Finished/Cached by Server (304) or Pre-check if req.status_code == 304: return 0 # Get Expected Size remote_size = int(req.headers.get("content-length", 0)) total_size = remote_size # Adjust expectation if this is a partial response if req.status_code == 206: content_range = req.headers.get("Content-Range", "") if "/" in content_range: total_size = int(content_range.split("/")[-1]) # Check if already done (.part matches full size) if check_size and total_size > 0 and resume_byte_pos == total_size: ## We have the whole file in .part, just move it. os.rename(part_fn, dst_fn) return 0 # Error Codes if req.status_code == 416: # Range No Good: Local file is likely corrupt. # Delete .part and retry from scratch (next loop iteration) logger.debug( f"Invalid Range for {os.path.basename(dst_fn)}. Restarting..." ) if os.path.exists(part_fn): os.remove(part_fn) if "Range" in self.headers: del self.headers["Range"] continue elif req.status_code in [401, 403]: # Earthdata/CDSE hack from cudem.fetches.fetches: requests strips Authorization # headers on cross-domain redirects. If the URL changed and we got a 401/403, # explicitly re-request the new URL with headers. if self.url != req.url: logger.debug( f"Auth dropped during redirect. Re-requesting: {req.url}" ) return Fetch( url=req.url, callback=self.callback, headers=self.headers, verify=self.verify, allow_redirects=self.allow_redirects, ).fetch_file( dst_fn=dst_fn, params=params, datatype=datatype, overwrite=overwrite, timeout=timeout, read_timeout=read_timeout, tries=tries, check_size=check_size, verbose=verbose, ) raise UnboundLocalError("Authentication Failed") elif req.status_code not in [200, 206]: # Fatal error for this attempt if attempt < tries - 1: time.sleep(2) continue raise ConnectionError(f"Status {req.status_code}") with open(part_fn, mode) as f: # desc = utils.str_truncate_middle(self.url, n=60) desc = utils.format_dataset_id(self.url) show_bar = verbose and not self.silent with tqdm( desc=desc, total=total_size, initial=resume_byte_pos, disable=not show_bar, unit="B", unit_scale=True, unit_divisor=1024, leave=False, ) as pbar: for chunk in req.iter_content(chunk_size=8192): if STOP_EVENT.is_set(): logger.warning("Download cancelled by user.") return -1 if chunk: f.write(chunk) pbar.update(len(chunk)) # If we got here without exception, check size, if wanted if check_size and total_size > 0: final_size = os.path.getsize(part_fn) if final_size < total_size: # If smaller, the connection was most likely cut. raise IOError( f"Incomplete download: {final_size}/{total_size} bytes" ) elif final_size > total_size: # If larger, it was likely decompressed on the fly (GZIP). logger.debug( f"File size ({final_size}) > Header ({total_size}). " "Assuming transparent decompression." ) else: pass os.rename(part_fn, dst_fn) return 0 except ( requests.exceptions.RequestException, IOError, UnboundLocalError, ) as e: if attempt < tries - 1: wait_time = (attempt + 1) * 2 logger.debug(f"Download failed: {e}. Retrying in {wait_time}s...") time.sleep(wait_time) else: logger.warning(f"Failed to download {self.url}: {e}") return -1 return -1
[docs] def fetch_ftp_file(self, dst_fn, params=None, datatype=None, overwrite=False): """Fetch an ftp file via ftplib with a progress bar.""" import ftplib status = 0 logger.info(f"Fetching remote ftp file: {self.url}...") dest_dir = os.path.dirname(dst_fn) if dest_dir and not os.path.exists(dest_dir): try: os.makedirs(dest_dir) except OSError: pass try: parsed = urllib.parse.urlparse(self.url) host = parsed.hostname path = parsed.path username = parsed.username or "anonymous" password = parsed.password or "anonymous@" ftp = ftplib.FTP(host) ftp.login(user=username, passwd=password) ftp.voidcmd("TYPE I") try: total_size = ftp.size(path) except ftplib.error_perm: total_size = None with open(dst_fn, "wb") as local_file: with tqdm( total=total_size, unit="B", unit_scale=True, desc=os.path.basename(dst_fn), leave=True, ) as pbar: def callback(data): local_file.write(data) pbar.update(len(data)) ftp.retrbinary(f"RETR {path}", callback) ftp.quit() logger.info(f"Fetched remote ftp file: {os.path.basename(self.url)}.") except Exception as e: logger.error(f"FTP Error: {e}") status = -1 if os.path.exists(dst_fn): try: os.remove(dst_fn) except OSError: pass return status
def _fetch_worker(module, entry, verbose=True): """Helper wrapper to call fetch_entry on a module.""" try: return module.fetch_entry(entry, check_size=True, verbose=verbose) except Exception as e: logger.error(f"Worker failed for {entry.get('url', 'unknown')}: {e}") return -1
[docs] def run_fetchez(modules: List["FetchModule"], threads: int = 3, global_hooks=None): """Run Fetchez in parallel with hooks. - mod.hooks: Run ONLY on entries belonging to 'mod'. - global_hooks: Run on ALL entries combined. """ STOP_EVENT.clear() if global_hooks is None: global_hooks = [] silent = logger.getEffectiveLevel() > logging.INFO # --- Module Pre-Hooks --- for mod in modules: mod_pre = [h for h in mod.hooks if h.stage == "manifest"] if not mod_pre: continue local_entries = [(mod, e) for e in mod.results] for hook in mod_pre: try: local_entries = hook.run(local_entries) if local_entries is None: local_entries = [] utils._log_hook_history(local_entries, hook) except Exception as e: logger.error( f'Module "{mod.name}" manifest-hook "{hook.name}" failed: {e}' ) # Update the mod.results mod.results = [e for m, e in local_entries] # all_entries = [] # for mod in modules: # for entry in mod.results: # all_entries.append((mod, entry)) all_entries = [] for mod in modules: for entry in mod.results: if not isinstance(entry, dict): logger.warning( f"Skipping malformed entry in module '{mod.name}': " f"Expected dict, got {type(entry).__name__} -> {entry}" ) continue all_entries.append((mod, entry)) # --- Global Pre-Hooks --- global_pre = [h for h in global_hooks if h.stage == "manifest"] for hook in global_pre: try: result = hook.run(all_entries) if isinstance(result, list): all_entries = result utils._log_hook_history(all_entries, hook) except Exception as e: logger.error(f'Global manifest-hook "{hook.name}" failed: {e}') total_files = len(all_entries) if total_files == 0: logger.debug("No files to fetch.") return logger.debug( f"Starting parallel fetch: {total_files} files with {threads} threads." ) final_results_with_owner = [] active_hooks_full = [] try: with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: futures = { executor.submit(_fetch_worker, mod, entry, verbose=True): (mod, entry) for mod, entry in all_entries } with tqdm( total=total_files, unit="file", desc="Starting Pipeline...", position=0, leave=False, disable=silent, ) as pbar: for future in concurrent.futures.as_completed(futures): mod, original_entry = futures[future] file_name = os.path.basename(original_entry.get("dst_fn", "item")) short_name = ( file_name[:30] + "..." if len(file_name) > 30 else file_name ) pbar.set_description(f"[{mod.name}] {short_name}") try: status = future.result() original_entry.update({"status": status}) if status != 0: logger.error( f"Failed to fetch: {os.path.basename(original_entry['dst_fn'])}" ) except Exception as e: logger.error(f"Worker exception: {e}") original_entry.update({"status": -1}) # --- File Hooks --- gf_hooks = [h for h in global_hooks if h.stage == "file"] lf_hooks = [h for h in mod.hooks if h.stage == "file"] # active_hooks = utils.merge_hooks(gf_hooks, lf_hooks) active_hooks = utils.merge_hooks(lf_hooks, gf_hooks) # active_hooks = lf_hooks + gf_hooks active_hooks_full.append(active_hooks) current_entries = [(mod, original_entry)] for hook in active_hooks: try: # Hook runs on current entry list current_entries = hook.run(current_entries) if current_entries is None: current_entries = [] utils._log_hook_history(current_entries, hook) except Exception as e: logger.error(f'File hook "{hook.name}" failed: {e}') # --- STREAM --- # If any hook set up a generator stream (e.g. SimpleStack, Filters), # we must exhaust it here to trigger the processing. processed_entries = [] for owner, item in current_entries: stream = item.get("stream") if stream and isinstance( stream, (collections.abc.Iterator, collections.abc.Generator), ): try: logger.debug( f"Exhausting stream for {os.path.basename(item.get('dst_fn', ''))}..." ) collections.deque(stream, maxlen=0) except Exception as e: logger.exception( f"Stream processing error in {os.path.basename(item.get('dst_fn', ''))}: {e}" ) processed_entries.append((owner, item)) final_results_with_owner.extend(processed_entries) pbar.update(1) except KeyboardInterrupt: STOP_EVENT.set() executor.shutdown(wait=False, cancel_futures=True) raise finally: # --- Teardown The Hook(s) --- logger.debug("Running teardown for all hooks...") all_possible_hooks = active_hooks_full for h in global_hooks: all_possible_hooks.append(h) for m in modules: for h in m.hooks: all_possible_hooks.append(h) for hook in all_possible_hooks: if hasattr(hook, "teardown"): try: hook.teardown() except Exception as e: logger.error(f"Teardown failed for hook '{hook.name}': {e}") # --- Post Hooks --- # Module-level Post-Hooks results_by_mod: Dict[Any, Any] = {m: [] for m in modules} for r_tuple in final_results_with_owner: owner_mod, entry = r_tuple if owner_mod in results_by_mod: results_by_mod[owner_mod].append((owner_mod, entry)) for mod in modules: mod_post = [h for h in mod.hooks if h.stage == "collection"] if mod_post and results_by_mod[mod]: current_mod_entries = results_by_mod[mod] for hook in mod_post: try: current_mod_entries = hook.run(current_mod_entries) if current_mod_entries is None: current_mod_entries = [] utils._log_hook_history(current_mod_entries, hook) except Exception as e: logger.error( f'Module "{mod.name}" collection-hook "{hook.name}" failed: {e}' ) results_by_mod[mod] = current_mod_entries # Re-flatten the lists after module-level post-hooks have modified them flat_results = [] for mod_entries in results_by_mod.values(): flat_results.extend(mod_entries) # Global-level Post-Hooks global_post = [h for h in global_hooks if h.stage == "collection"] for hook in global_post: try: flat_results = hook.run(flat_results) if flat_results is None: flat_results = [] utils._log_hook_history(flat_results, hook) except Exception as e: logger.error(f'Global collection-hook "{hook.name}" failed: {e}') return flat_results
# ============================================================================= # Fetch Module (Base & Default/Test Implementations) # # To create a sub-module, inherit from this `FetchModule` here. # Then redefine `run` (and do whatever else). `run` should populate # self.results, which is used for fetching. `self.results` should have # at least `url`, `dst_fn` and `data_type` set for each fetch result, # though any relevant info can fill the rest of it for whatever purpose... # =============================================================================
[docs] class FetchModule: """Base class for all fetch modules."""
[docs] def __init__( self, src_region=None, callback=fetches_callback, hook=None, outdir=None, name="fetches", min_year=None, max_year=None, weight=1.0, uncertainty=0.0, params={}, **kwargs, ): self.region = src_region self.callback = callback self.outdir = outdir self.params = params self.status = 0 self.results = [] self.name = name self.min_year = utils.int_or(min_year) self.max_year = utils.int_or(max_year) self.headers = R_HEADERS.copy() if self.outdir is None: self._outdir = os.path.join(os.getcwd(), self.name) else: self._outdir = os.path.join(self.outdir, self.name) self.internal_hooks = [] self.external_hooks = hook if hook else [] self.weight = float(weight) self.uncertainty = float(uncertainty) # comment out for linter, but we'll be using this in the future. # presets = {} # Example structure: # presets = { # 'lidar-clean': [ # {'name': 'unzip'}, # {'name': 'filter', 'args': {'match': '.laz'}} # ] # } # For dlim support, we can check these variables for # to do the proper processing. Set these to their correct # values in the sub-class. # Maybe with fetchez now, we can set these in `results` # instead... self.data_format = None self.src_srs = None self.title = None self.source = None self.date = None self.data_type = None self.resolution = None self.hdatum = None self.vdatum = None self.url = None # Default to whole world if region is invalid/missing # Set a generic region of the entire world in WGS84 if no region # was specified or if its an invalid region...this will result in quite # a lot of downloading on global datasets, so be careful with this. if self.region is None or not spatial.region_valid_p(self.region): self.region = (-180, 180, -90, 90) self.silent = logger.getEffectiveLevel() > logging.INFO
@property def hooks(self): """Combine internal and external hooks in the correct execution order.""" return self.internal_hooks + self.external_hooks
[docs] def add_hook(self, hook_obj): """Add a hook instance at runtime.""" if hasattr(hook_obj, "run"): self.external_hooks.append(hook_obj) else: logger.warning( f"Hook {hook_obj} does not appear to be a valid FetchHook class." )
[docs] def run(self): """set `run` in a sub-module to populate `results` with urls""" raise NotImplementedError
[docs] def fetch_entry(self, entry, check_size=True, retries=5, verbose=True): try: parsed_url = urllib.parse.urlparse(entry["url"]) if parsed_url.scheme == "ftp": logger.info("ok") status = Fetch(url=entry["url"], headers=self.headers).fetch_ftp_file( entry["dst_fn"] ) else: status = Fetch( url=entry["url"], headers=self.headers, ).fetch_file( entry["dst_fn"], check_size=check_size, tries=retries, verbose=verbose, ) except Exception: status = -1 return status
[docs] def fill_results(self, entry): """fill self.results with the fetch module entry""" self.results.append( {"url": entry[0], "dst_fn": entry[1], "data_type": entry[2]} )
[docs] def add_entry_to_results(self, url, dst_fn, data_type, **kwargs): """Add fetch entries to `results`. any keyword/args can be added to `results`, but we need `url`, `dst_fn` and `data_type`. """ if utils.str_or(dst_fn) is not None: dst_fn = os.path.join(self._outdir, dst_fn) entry = {"url": url, "dst_fn": dst_fn, "data_type": data_type} entry.update(kwargs) self.results.append(entry)
# Simple Fetch Module to fetch a url. # It will just add that url to `results`.
[docs] class HttpDataset(FetchModule): """Fetch an http file directly."""
[docs] def __init__(self, url=None, **kwargs): super().__init__(**kwargs) self.url = url
[docs] def run(self): if self.url: self.add_entry_to_results(self.url, os.path.basename(self.url), "https")
[docs] class Scratch(FetchModule): """Scratch module that just fills the results with it's own arguments. """
[docs] def __init__(self, url, path, datatype, **kwargs): super().__init__(**kwargs) self.url = url self.path = path self.datatype = datatype
[docs] def run(self): self.add_entry_to_results(self.url, self.path, self.datatype)