Source code for pyncei.bot

"""Tools to access data from NOAA's Climate Data Online Web Services v2 API"""

from copy import copy
import csv
from datetime import datetime
import glob
import logging
import os
import random
import re
import time
import urllib.parse
import warnings

import pandas as pd
import requests
import requests_cache

try:
    import geopandas as gpd
except ImportError:
    pass


logger = logging.getLogger(__name__)


[docs] class NCEIBot: """Contains functions to request data from the NCEI web services Attributes: wait (float): time in seconds between requests. NCEI allows a maximum of five queries per second. validate_params (bool): whether to validate query parameters before making a GET request. Defaults to False. max_retries (int): number of times to retry requests that fail because of temporary connectivity or server lapses. Retries use an exponential backoff. Defaults to 12. The get functions described below use a common set of keyword arguments. The sortorder, limit, offset, and max arguments can be used in any get function; other keywords vary by endpoint. Most values appear to be case-sensitive. Query validation, if enabled, should capture most but not all case errors. Args: datasetid (str or list): the id or name of a NCEI dataset. Multiple values allowed for most functions. Examples: GHCND; PRECIP_HLY; Weather Radar (Level III). datacategoryid (str or list): the id or name of a NCEI data category. Data categories are broader than data types. Multiple values allowed. Examples: TEMP, WXTYPE, Degree Days. datatypeid (str or list): the id or name of a data type. Multiple values allowed. Examples: TMIN; SNOW; Long-term averages of fall growing degree days with base 70F. locationid (str or list): the id or name of a location. Multiple values allowed. If a name is given, the script will try to map it to an id. Examples: Maryland; FIPS:24; ZIP:20003; London, UK. stationid (str or list): the id of name of a station in the NCEI database. Multiple values allowed. Examples: COOP:010957. startdate (str or datetime): the earliest date available enddate (str or datetime): the latest date available sortfield (str): field by which to sort the query results. Available sort fields vary by endpoint. sortorder (str): specifies whether sort is ascending or descending. Must be 'asc' or 'desc'. limit (int): number of records to return per query offset (int): index of the first record to return max (int): maximum number of records to return. Not part of the API. """
[docs] def __init__(self, token, wait=0.2, cache_name=None, **cache_kwargs): """Initializes NCEIBot object Args: token (str): NCEI token wait (float or int): time in seconds to wait between requests cache_name (str): path to cache cache_kwargs: any keyword argument accepted by requests_cache.CachedSession """ self.validate_params = False self.max_retries = 12 # Queries are capped at five per second, so enforce that with # a minimum wait time of 0.2 seconds if wait < 0.2: self.wait = 0.2 else: self.wait = wait if cache_kwargs and not cache_name: raise Exception("Must specify cache_name if cache_kwargs are provided") # Cache queries using requests_cache if cache_name: self._cache = True self._session = requests_cache.CachedSession(cache_name, **cache_kwargs) else: self._cache = False self._session = requests.Session() # Lazy load __version__ to prevent circular import error from . import __version__ self._session.headers.update( {"token": token, "User-Agent": f"pyncei v{__version__}"} ) self._validators = { "datacategoryid": self._check_name, "datasetid": self._check_name, "datatypeid": self._check_name, "enddate": self._check_date, "extent": self._check_extent, "limit": self._check_limit, "locationid": self._check_name, "locationcategoryid": self._check_name, "max": self._check_positive_integer, "offset": self._check_positive_integer, "stationid": self._check_name, "startdate": self._check_date, "sortfield": self._check_sortfield, "sortorder": self._check_sortorder, "units": self._check_units, } # List of fields that can occur more than once in a given query. # This list may need to be adjusted depending on the endpoint; # for example, the data endpoint allows only one dataset to be passed. self._allow_multiple = [ "datacategoryid", "datasetid", "datatypeid", "locationid", "locationcategoryid", "stationid", ] # List of endpoints self._endpoints = [ "datacategories", "datasets", "datatypes", "locations", "locationcategories", "stations", ] # Create name lookups to help users map to ids needed for querying self._lookups = {} self._filepath = os.path.join(os.path.dirname(__file__), "files") try: os.makedirs(self._filepath) except OSError: for fp in glob.iglob(os.path.join(self._filepath, "*.csv")): fn = os.path.splitext(os.path.basename(fp))[0] self._lookups[fn] = {} with open(fp, encoding="utf-8-sig", newline="") as f: rows = csv.reader(f, delimiter=",", quotechar='"') try: next(rows) except StopIteration: pass else: for row in rows: for item in row: self._lookups[fn][item.lower()] = tuple(row)
[docs] def get_data(self, **kwargs): """Retrieves historical climate data matching the given parameters See :py:class:`~pyncei.bot.NCEIBot` for more details about each keyword argument. Args: datasetid (str): Required. Only one value allowed. startdate (str or datetime): Required. Returned stations will have data for the specified dataset/type from on or after this date. enddate (str or datetime): Required. Returned stations will have data for the specified dataset/type from on or before this date. datatypeid (str or list): Optional locationid (str or list): Optional stationid (str or list): Optional units (str): Optional. One of 'standard' or 'metric'. sortfield (str): Optional. If provided, must be one of 'datatype', 'date', or 'station'. sortorder (str): Optional limit (int): Optional offset (int): Optional max (int): Optional Returns: List of dicts containing historical weather data """ url = "http://www.ncdc.noaa.gov/cdo-web/api/v2/data" required = ["datasetid", "startdate", "enddate", "units"] optional = [ "datatypeid", "locationid", "stationid", "sortfield", "sortorder", "limit", "offset", "includemetadata", ] # Assign default unit. Returned values are nonsense without this. if not kwargs.get("units"): kwargs["units"] = "metric" self._allow_multiple.remove("datasetid") url, params = self._prepare_query(url, [], kwargs, required, optional) self._allow_multiple.append("datasetid") return self._get(url, params)
[docs] def get_datasets(self, datasetid=None, **kwargs): """Returns data from the NCEI dataset endpoint See :py:class:`~pyncei.bot.NCEIBot` for more details about each keyword argument. Args: datasetid (str): a single dataset to return information about. Optional. The kwargs are ignored if this is provided. datatypeid (str or list): Optional locationid (str or list): Optional stationid (str or list): Optional sortfield (str): Optional. If provided, must be one of 'id', 'name', 'mindate', 'maxdate', or 'datacoverage'. sortorder (str): Optional limit (int): Optional offset (int): Optional max (int): Optional Returns: List of dicts containing metadata for all matching datasets """ url = "http://www.ncdc.noaa.gov/cdo-web/api/v2/datasets" required = [] optional = [ "datatypeid", "locationid", "stationid", "startdate", "enddate", "sortfield", "sortorder", "limit", "offset", ] url, params = self._prepare_query(url, datasetid, kwargs, required, optional) return self._get(url, params)
[docs] def get_data_categories(self, datacategoryid=None, **kwargs): """Returns codes and labels for NCDI data categories See :py:class:`~pyncei.bot.NCEIBot` for more details about each keyword argument. Args: datacategoryid (str): a single data category to return information about. Optional. The kwargs are ignored if this is provided. datasetid (str or list): Optional locationid (str or list): Optional stationid (str or list): Optional startdate (str or datetime): Optional enddate (str or datetime): Optional sortfield (str): Optional. If provided, must be one of 'id', 'name', 'mindate', 'maxdate', or 'datacoverage'. sortorder (str): Optional limit (int): Optional offset (int): Optional max (int): Optional Returns: List of dicts containing metadata for all matching data categories """ url = "http://www.ncdc.noaa.gov/cdo-web/api/v2/datacategories" required = [] optional = [ "datasetid", "locationid", "stationid", "startdate", "enddate", "sortfield", "sortorder", "limit", "offset", ] url, params = self._prepare_query( url, datacategoryid, kwargs, required, optional ) return self._get(url, params)
[docs] def get_data_types(self, datatypeid=None, **kwargs): """Returns information about NCEI data categories See :py:class:`~pyncei.bot.NCEIBot` for more details about each keyword argument. Args: datatypeid (str): a single data type to return information about. Optional. The kwargs are ignored if this is provided. datasetid (str or list): Optional locationid (str or list): Optional stationid (str or list): Optional datacategoryid (str or list): Optional startdate (str or datetime): Optional enddate (str or datetime): Optional sortfield (str): Optional. If provided, must be one of 'id', 'name', 'mindate', 'maxdate', or 'datacoverage'. sortorder (str): Optional limit (int): Optional offset (int): Optional max (int): Optional Returns: List of dicts containing metadata for all matching data types """ url = "http://www.ncdc.noaa.gov/cdo-web/api/v2/datatypes" required = [] optional = [ "datasetid", "locationid", "stationid", "datacategoryid", "startdate", "enddate", "sortfield", "sortorder", "limit", "offset", ] url, params = self._prepare_query(url, datatypeid, kwargs, required, optional) return self._get(url, params)
[docs] def get_location_categories(self, locationcategoryid=None, **kwargs): """Returns information about NCEI location categories See :py:class:`~pyncei.bot.NCEIBot` for more details about each keyword argument. Args: locationcategoryid (str): a single location category to return information about. Optional. The kwargs are ignored if this is provided. datasetid (str or list): Optional sortfield (str): Optional. If provided, must be one of 'id' or 'name'. sortorder (str): Optional limit (int): Optional offset (int): Optional max (int): Optional Returns: List of dicts containing metadata about location categories """ url = "http://www.ncdc.noaa.gov/cdo-web/api/v2/locationcategories" required = [] optional = [ "datasetid", "startdate", "enddate", "sortfield", "sortorder", "limit", "offset", ] url, params = self._prepare_query( url, locationcategoryid, kwargs, required, optional ) return self._get(url, params)
[docs] def get_locations(self, locationid=None, **kwargs): """Returns metadata for locations matching the given parameters See :py:class:`~pyncei.bot.NCEIBot` for more details about each keyword argument. Args: locationid (str): a single location to return information about. Optional. The kwargs are ignored if this is provided. datasetid (str or list): Optional locationcategoryid (str or list): Optional datacategoryid (str or list): Optional sortfield (str): Optional. If provided, must be one of 'id', 'name', 'mindate', 'maxdate', or 'datacoverage'. sortorder (str): Optional limit (int): Optional offset (int): Optional max (int): Optional Returns: List of dicts containing metadata for all matching locations """ url = "http://www.ncdc.noaa.gov/cdo-web/api/v2/locations" required = [] optional = [ "datasetid", "locationcategoryid", "datacategoryid", "startdate", "enddate", "sortfield", "sortorder", "limit", "offset", ] url, params = self._prepare_query(url, locationid, kwargs, required, optional) return self._get(url, params)
[docs] def get_stations(self, stationid=None, **kwargs): """Returns metadata for stations matching the given parameters See :py:class:`~pyncei.bot.NCEIBot` for more details about each keyword argument. Args: stationid (str): a single station to return information about. Optional. The kwargs are ignored if this is provided. datasetid (str or list): Optional locationid (str or list): Optional datacategoryid (str or list): Optional datatypeid (str or list): Optional extent (str or iterable): comma-delimited bounding box of form 'min_lat, min_lng, max_lat, max_lng' or equivalent iterable. Optional. sortfield (str): Optional. If provided, must be one of 'id', 'name', 'mindate', 'maxdate', or 'datacoverage'. sortorder (str): Optional limit (int): Optional offset (int): Optional max (int): Optional Returns: List of dicts containing metadata for all matching stations """ url = "http://www.ncdc.noaa.gov/cdo-web/api/v2/stations" required = [] optional = [ "datasetid", "locationid", "datacategoryid", "datatypeid", "extent", "startdate", "enddate", "sortfield", "sortorder", "limit", "offset", ] url, params = self._prepare_query(url, stationid, kwargs, required, optional) return self._get(url, params)
[docs] def find_ids(self, term=None, endpoints=None): """Find key terms that match the search string for the given endpoints Args: term (str): the term to search for. If None, returns a list of all available terms for the specified endpoint(s). endpoints (str or list): name of one or more NCEI endpoints Returns: List of (endpoint, id, name) for matching key terms from the specified endpoint """ if endpoints is None: endpoints = sorted(self._lookups) if isinstance(endpoints, str): endpoints = [endpoints] ids = [] for endpoint in endpoints: try: lookup = self._lookups[endpoint.lower()] except KeyError: raise else: try: matches = [lookup[term.lower()]] except KeyError: matches = [v for k, v in lookup.items() if term.lower() in k] ids.extend(sorted({(endpoint, *m) for m in matches})) return ids
[docs] def refresh_lookups(self, keys=None): """Update the csv files used to populate the endpoint lookups Args: keys (list): list of endpoints to populate. If empty, everything but stations will be populated. Returns: None """ endpoints = { "datasets": self.get_datasets, "datacategories": self.get_data_categories, "datatypes": self.get_data_types, "locationcategories": self.get_location_categories, "locations": self.get_locations, "stations": self.get_stations, } if keys is None: keys = [k for k in endpoints if k != "stations"] elif not isinstance(keys, list): keys = [keys] for key in keys: try: response = endpoints[key]() except KeyError as exc: raise Exception(f"{key} is not a valid id") from exc else: fp = os.path.join(self._filepath, key + ".csv") with open(fp, "w", encoding="utf-8-sig", newline="") as f: writer = csv.writer(f, dialect="excel") writer.writerow(["id", "name"]) for result in response.values(): row = [result["id"], result["name"]] writer.writerow(row)
def _get_with_retry(self, url, params): """Retries a get request with an exponential backoff Args: url (str): NCDI webservice url params (dict): query parameters Returns: response to given request """ for i in range(self.max_retries): try: resp = self._session.get(url, params=self._encode_params(params)) # Retry if status code indicates a temporary problem if resp.status_code in (429, 503): raise requests.exceptions.ConnectionError( f"Request failed: {resp.url} (status_code={resp.status_code})" ) return resp except ( requests.exceptions.ConnectionError, requests.exceptions.Timeout, ) as err: # Add a random number of milliseconds to the wait time to prevent # multiple retries from synchronizing wait = 2**i + random.randint(1, 1000) / 1000 print( f"Retrying temporarily failed request in {wait}s" f" (url={url}, params={params}, error='{err}')" ) time.sleep(wait) raise Exception(f"Request failed (url={url}, params={params})") def _get(self, url, params): """Retrieves all matching records for a given url and parameter set Args: url (str): NCDI webservice url params (dict): query parameters Returns: List of dicts containing the requested data """ # Many of the NCDI webservies have two different endpoints: one for # a single, specific argument (for example, a station id), another # for a query string. Here, specific requests are given a trailing # backslash as a lazy way to tell the two types of reqeuests apart. if not url.endswith("/"): try: offset = params["offset"] except KeyError: params["offset"] = offset = 1 else: # Offsets 0 and 1 both return the same record. Specifying # an offset of 1 makes subsequent offsets (made by adding # the limit to the last offset) start at the right record. if not offset: params["offset"] = offset = 1 # Minimize number of queries required to retrieve data # by adjusting limit based on total number of records try: limit = params["limit"] except KeyError: params["limit"] = limit = 1000 try: total = params.pop("max") except KeyError: total = limit if limit < 1000 else 1e12 # any large number works else: if total < 1000: params["limit"] = limit = total else: params["limit"] = limit = 1000 else: total = limit = 1 logger.debug("Final parameter set:") if total > 0: logger.debug(f"total: {total}") for key in params: logger.debug(f"{key}: {params[key]}") response = NCEIResponse() while response.count() < total: logger.info("Requesting data") # NCEI does not like encoded colons, so encode the query string first resp = self._get_with_retry(url, params) if resp.status_code == 200: logger.info(f"Resolved {resp.url}") # Enforce a wait period between requests if self._cache and not resp.from_cache: logger.info("Caching request") time.sleep(self.wait) elif not self._cache: logger.info(f"Waiting {self.wait} seconds...") time.sleep(self.wait) else: logger.info("URL was retrieved from cache") response.append(resp) if response.total() < total: total = response.total() logger.info(f"{response.count():,}/{total:,} records retrieved") try: params["offset"] += limit except KeyError: params["offset"] = limit else: raise Exception( f"Failed to resolve {resp.url} ({resp.status_code}: {resp.text}" ) return response def _prepare_query(self, url, endpoint_id, kwargs, required, optional): """Validate query Args: url (str): url to NCEI endpoint endpoint_id (tuple): id from the endpoint kwargs (dict): keyed query parameters required (list): required fields for endpoint optional (list): optional fields for endpoint Returns: Tuple (url string, paramter dict) if query is valid """ logger.info(f"Preparing request to {url}") if endpoint_id: if kwargs: warnings.warn(f"Ignoring kwargs: {kwargs}") # Return URL for a specific endpoint return url + f"/{endpoint_id}/", {} if self.validate_params: # Extend optional with helper fields optional.extend(["max"]) # Confirm that all required fields are present missing = [key for key in required if not key in kwargs] if missing: raise Exception(f'Required parameters missing: {", ".join(missing)}') # Check that all fields in kwargs are valid invalid = [key for key in kwargs if not key in required + optional] if invalid: raise Exception(f'Invalid parameters found: {", ".join(invalid)}') # Clean up kwargs kwargs = self._check_kwargs(kwargs, url.split("/").pop()) else: # Try to map names to ids even if validation is disabled ids = { k: v for k, v in kwargs.items() if self._validators[k] in (self._check_name, self._check_extent) } kwargs.update(self._check_kwargs(ids, url.split("/").pop())) # Query string endpoint return url, kwargs def _check_kwargs(self, kwargs, endpoint): """Validates values given for query parameters Args: kwargs (dict): query parameters endpoint (str): name of valid NCEI endpoint Returns: Dict containing cleaned up values for kwargs """ errors = [] # Check kwargs against validation functions for key in kwargs.keys(): vals = kwargs[key] # Extent can be an iterable, so treat lists in this key as one value if isinstance(vals, (list, tuple)) and key == "extent": vals = [vals] if not isinstance(vals, (list, tuple)): vals = [vals] validated = [] for val in vals: try: value, status = self._validators[key](val, key, endpoint) except KeyError: # Catches bad parameter names. In practice, this should # never occur because bad params should be weeded out # beforehand. errors.append(f"{key} is not a valid parameter") else: if status is False: errors.append(f"{key}: {value} is invalid") else: validated.append(value) logger.info(f"{key}: {value} is valid") if not errors: # Catch multiple values passed to key that only accepts one if not key in self._allow_multiple: if len(validated) > 1: errors.append(f"{key} only accepts one value") else: validated = validated[0] # Map helper fields to corresponding query fields try: self._endpoints.index(re.sub(r"id$", "", key)) except ValueError: kwargs[key] = validated if errors: s = "" if len(errors) == 1 else "s" raise Exception(f'Parameter error{s}: {"; ".join(errors)}') return kwargs def _check_name(self, value, key, endpoint): """Map name to id for a given key, if possible Args: value (str): an identifer or name key (str): name of field being checked endpoint (str): name of current NCEI endpoint Returns: Tuple (id, True) if name is valid, or tuple (error message, False) if not. """ endpoint = [e for e in self._endpoints if e.startswith(key.rstrip("deis"))][0] try: ids = self.find_ids(value, endpoint) if len(ids) == 1: return ids[0][1], True except KeyError: # Allow original value through if no lookup is configured warnings.warn(f"No lookup list found for {endpoint}") return value, True except AttributeError: pass return f"Failed to map '{value}' to an id", False @staticmethod def _check_date(date, key, endpoint): """Validate and formate date Args: date (str or dateime.datetime): date or equivalent key (str): name of field being checked endpoint (str): name of current NCEI endpoint Returns: Tuple (date string, True) if date is valid, or tuple (error message, False) if not. """ try: return date.strftime("%Y-%m-%d"), True except AttributeError: try: datetime.strptime(date, "%Y-%m-%d") except (TypeError, ValueError): pass else: return date, True return "Must be a datetime object or string formatted as %Y-%m-%d", False @staticmethod def _check_extent(extent, key, endpoint): """Validate extent query parameter Args: extent (str or iterable): comma-delimited bounding box of form 'min_lat, min_lng, max_lat, max_lng' or equivalent iterable key (str): name of field being checked endpoint (str): name of current NCEI endpoint Returns: Tuple (extent string, True) if extent is valid, or tuple (error message, False) if not. """ if isinstance(extent, str): extent = [s.strip() for s in extent.split(",")] min_lat, min_lng, max_lat, max_lng = [float(c) for c in extent] if min_lat < max_lat and min_lng < max_lng: return ",".join([str(s) for s in extent]), True return 'Must be string/iterable of "min_lat, min_lng, max_lat, max_lng"', False @staticmethod def _check_sortfield(value, key, endpoint): """Validate sortfield query parameter Args: value (str): name of sort field. Sort fields vary by endpoint. key (str): name of field being checked endpoint (str): name of current NCEI endpoint Returns: Tuple (sort field, True) if sort field is valid, or tuple (error message, False) if not. """ fields = { "data": ["datatype", "date", "station"], "datasets": ["id", "name", "mindate", "maxdate", "datacoverage"], "datacategories": ["id", "name"], "locationcategories": ["id", "name"], "locations": ["id", "name", "mindate", "maxdate", "datacoverage"], "stations": ["id", "name", "mindate", "maxdate", "datacoverage"], } try: value = value.lower() except AttributeError: pass else: if value in fields[endpoint]: return value, True return f'Must be one of the following: {", ".join(fields[endpoint])}', False @staticmethod def _check_sortorder(value, key, endpoint): """Validate sort order Args: value (str): 'asc' or 'desc' key (str): name of field being checked endpoint (str): name of current NCEI endpoint Returns: Tuple (validated string, True) if order is valid, or tuple (error message, False) if not. """ valid = ["asc", "desc"] try: value = value.lower() except AttributeError: pass else: if value in valid: return value, True return f'Must be one of the following: {", ".join(valid)}', False @staticmethod def _check_units(value, key, endpoint): """Validate units Args: value (str): 'standard' or 'metric' key (str): name of field being checked endpoint (str): name of current NCEI endpoint Returns: Tuple (validated string, True) if order is valid, or tuple (error message, False) if not. """ valid = ["standard", "metric"] try: value = value.lower() except AttributeError: pass else: if value in valid: return value, True return f'Must be one of the following: {", ".join(valid)}', False @staticmethod def _check_limit(value, key, endpoint): """Validate limit Args: value (str or int): integer to validate key (str): name of field being checked endpoint (str): name of current NCEI endpoint Returns: Tuple (validated integer, True) if limit is valid, or tuple (error message, False) if not. """ try: value = int(value) except (TypeError, ValueError): pass else: if 0 < value <= 1000: return value, True return "Must be an integer between 1 and 1000, inclusive", False @staticmethod def _check_positive_integer(value, key, endpoint): """Validate positive integer Args: value (str or int): integer to validate key (str): name of field being checked endpoint (str): name of current NCEI endpoint Returns: Tuple (validated integer, True) if number is valid, or tuple (error message, False) if not. """ try: value = int(value) except (TypeError, ValueError): pass else: if value >= 0: return value, True return "Must be an integer greater than or equal to 0", False @staticmethod def _encode_params(params, safe=":,"): param_list = [] for key, vals in params.items(): for val in vals if isinstance(vals, (list, tuple)) else [vals]: param_list.append((key, val)) return urllib.parse.urlencode(param_list, safe=safe)
[docs] class NCEIResponse(list): """Wraps results of one or more calls to the NCEI API Extends list. Each response is stored as an entry in the list. """ #: list used to order the keys in the NCEI data key_order = [ "id", "uid", "name", "station", "latitude", "longitude", "elevation", "elevationUnit", "datacoverage", "date", "mindate", "maxdate", "datatype", "attributes", "value", "url", "retrieved", ] #: dict mapping NCEI fields to date formats date_formats = { "date": "%Y-%m-%dT%H:%M:%S", "maxdate": "%Y-%m-%d", "mindate": "%Y-%m-%d", "retrieved": "%Y-%m-%dT%H:%M:%S", } def __str__(self): return ( f"<{self.__class__.__name__} responses={len(self)}" f" count={self.count()} total={self.total()}>" ) def __repr__(self): return str(self) def __bool__(self): for resp in self: if resp.json(): return True return False
[docs] def values(self): """Gets the results from all responses Returns: generator of dicts """ for resp in self: metadata = { "url": resp.url, "retrieved": datetime.strptime( resp.headers["Date"], "%a, %d %b %Y %H:%M:%S %Z" ).isoformat(), } for val in self._get_results(resp): if val: val.update(metadata) keys = set(val.keys()) if keys - set(self.key_order): raise KeyError( f"Found unordered keys: {keys - set(self.key_order)}" ) yield {k: val[k] for k in self.key_order if k in keys}
[docs] def first(self): """Gets the first result from the compiled responses Returns: dict """ for val in self.values(): return val
[docs] def count(self): """Counts the number of results that have been returned Returns: number of records returned as int """ return sum([len(self._get_results(r)) for r in self])
[docs] def total(self): """Counts the total number of results available for all URLs Returns: total number of records matching the responses as int """ urls = {} for resp in self: # Group by url with pagination parameters removed url = re.sub(r"\b(offset|limit|max)=\d+\b", "&", resp.url).strip("&") try: urls.setdefault(url, int(resp.json()["metadata"]["resultset"]["count"])) except KeyError: urls.setdefault(url, 1) return sum(urls.values())
[docs] def to_csv(self, path): """Writes data to a CSV Args: path (str): path to csv """ with open(path, "w", encoding="utf-8", newline="") as f: writer = csv.writer(f, dialect="excel") keys = None for row in self.values(): row = row.copy() if not keys: keys = row.keys() writer.writerow(keys) writer.writerow([row[k] for k in keys])
[docs] def to_dataframe(self): """Writes data to a dataframe Returns: pandas.DataFrame or geopandas.GeoDataFrame if geopandas is installed and the responses include coordinates """ df = pd.DataFrame(self.values()) # Convert datetime columns to datetime objects for key, date_format in self.date_formats.items(): if key in df.columns: df[key] = pd.to_datetime(df[key], format=date_format) # Convert DataFrame with coordinates to GeoDataFrame if geopandas installed. # Uses NAD83 as the CRS. This appears to be NOAA's preferred CRS but it's # not explicitly defined in the webservice documentation that I could find. if "latitude" in df.columns and "longitude" in df.columns: try: df = gpd.GeoDataFrame( df, geometry=gpd.points_from_xy(df.longitude, df.latitude), crs="NAD83", ) except NameError: # geopandas is optional pass return df
@staticmethod def _get_results(resp): resp_json = resp.json() try: return resp_json["results"] except KeyError: return [resp_json]