Source code for hydrofunctions.station


This module contains the Station and NWIS classes, which are used for
organizing and managing data for data collection sites.

import re

import json
import warnings
from . import hydrofunctions as hf

import numpy as np

[docs]class Station(object): """A class for organizing stream gauge data for a single request.""" station_dict = {} def __init__(self, site=None): Station.station_dict[site] = self = site
# One option is to make it so that you can pass in a get_data function # during the creation of an instance.
[docs]class NWIS(Station): """A class for working with data from the USGS NWIS service. Args: site (str or list of strings): a valid site is '01585200' or ['01585200', '01646502']. Default is `None`. If site is not specified, you will need to select sites using stateCd or countyCd. service (str): can either be 'iv' or 'dv' for instantaneous or daily data. * 'dv'(default): daily values. Mean value for an entire day. * 'iv': instantaneous value measured at this time. Also known\ as 'Real-time data'. Can be measured as often as every\ five minutes by the USGS. 15 minutes is more typical. start_date (str): should take on the form 'yyyy-mm-dd' end_date (str): should take on the form 'yyyy-mm-dd' stateCd (str): a valid two-letter state postal abbreviation, such as 'MD'. Default is None. Selects all stations in this state. Because this type of site selection returns a large number of sites, you should limit the amount of data requested for each site. countyCd (str or list of strings): a valid county FIPS code. Default is None. Requests all stations within the county or list of counties. See for an explanation of FIPS codes. bBox (str, list, or tuple): a set of coordinates that defines a bounding box. * Coordinates are in decimal degrees. * Longitude values are negative (west of the prime meridian). * Latitude values are positive (north of the equator). * comma-delimited, no spaces, if provided as a string. * The order of the boundaries should be: "West,South,East,North" * Example: "-83.000000,36.500000,-81.000000,38.500000" parameterCd (str or list of strings): NWIS parameter code. Usually a five digit code. Default is 'all'. A valid code can also be given as a list: parameterCd=['00060','00065'] This will request data for this parameter. * if value is 'all', or no value is submitted, then NWIS will \ return every parameter collected at this site. (default option) * stage: '00065' * discharge: '00060' * not all sites collect all parameters! * See for full list period (str): NWIS period code. Default is None. * Format is "PxxD", where xx is the number of days before \ today, with a maximum of 999 days accepted. * Either use start_date or period, but not both. interpolate (bool): Fill missing values through interpolation. Default False. file (str): A filename for acting as a cache for the data request. Accepts file extensions of '*.json.gz' (default) and '*.parquet'. If this parameter is included, the NWIS object will first attempt to read its data from the file. If the file does not exist, it will use the other parameters to obtain the data and will then save to the provided filename. Zipped JSON files will save the original WaterML JSON provided by the NWIS. Parquet files will save the dataframe and the metadata for the NWIS object. verbose (bool): Print output for actions such as making data requests. Default is True. """ def __init__( self, site=None, service="dv", start_date=None, end_date=None, stateCd=None, countyCd=None, bBox=None, parameterCd="all", period=None, interpolate=False, file=None, verbose=True, ): self.ok = False if file: if len(file.split(".")) == 1: file = file + ".json.gz" try: self.ok = True if verbose: print("Reading data from", file) except OSError as err: # File does not exist yet, we'll make it later. pass if not self.ok: self.response = hf.get_nwis( site, service, start_date, end_date, stateCd=stateCd, countyCd=countyCd, bBox=bBox, parameterCd=parameterCd, period=period, verbose=verbose, ) try: self.json = self.response.json() self._dataframe, self.meta = hf.extract_nwis_df( self.json, interpolate=interpolate ) self.ok = self.response.ok if file is not None: if verbose: print("Saving data to", file) except json.JSONDecodeError as err: self.ok = False print(f"JSON decoding error. URL: {self.response.url}") raise err # Can I get rid of this, and only keep metadata in the meta dict? if self.ok: = site self.service = service self.start_date = start_date self.end_date = end_date self.start = self._dataframe.index.min() self.end = self._dataframe.index.max() def __repr__(self): repr_string = "" for site_id in sorted(self.meta.keys()): repr_string += site_id + ": " + self.meta[site_id]["siteName"] + "\n" for param in sorted(self.meta[site_id]["timeSeries"].keys()): repr_string += ( " " + param + ": " + self.meta[site_id]["timeSeries"][param]["variableFreq"] + " " + self.meta[site_id]["timeSeries"][param]["variableDescription"] + " " + self.meta[site_id]["timeSeries"][param]["methodDescription"] + "\n" ) repr_string += "Start: " + str(self.start) + "\n" + "End: " + str(self.end) return repr_string
[docs] def df(self, *args): """ Return a subset of columns from the dataframe. Args: '': If no args are provided, the entire dataframe will be returned. str 'all': the entire dataframe will be returned. str 'data': all of the parameters will be returned, with no flags. str 'flags': Only the _qualifier flags will be returned. Unless the \ flags arg is provided, only data columns will be returned. Visit \ to see a \ more complete listing of possible codes. str 'discharge' or 'q': discharge columns ('00060') will be returned. str 'stage': Gauge height columns ('00065') will be returned. str any five digit number: any matching parameter columns will be returned. '00065' returns stage, for example. str any eight to twelve digit number: any matching stations will be returned. """ all_cols = self._dataframe.columns != "" # all true no_cols = ~all_cols # all false data_cols = self._dataframe.columns.str.contains( r"[0-9]$" ) # Data columns end in a number. flag_cols = self._dataframe.columns.str.contains("_qualifiers") Q_cols = self._dataframe.columns.str.contains( ":00060:" ) # This includes data & flags stage_cols = self._dataframe.columns.str.contains(":00065:") param_re = r"^\d{5}$" # parameters are a five-digit number. station_re = r"\d{8,12}$" # station ID's are between 8 and 12 digits. sites = no_cols params = no_cols meta = no_cols if len(args) == 0: # If no args are given, return every column. sites = all_cols params = all_cols meta = all_cols else: for item in args: if item == "all": sites = all_cols params = all_cols meta = all_cols break # If one param is 'all', ignore the other params and deliver everything. elif item == "discharge": if not np.any( ["00060" in x for x in list(self._dataframe.columns)] ): raise ValueError( "The parameter '{param}' is not contained in this dataset.".format( param=item ) ) params = Q_cols | params elif item == "q": if not np.any( ["00060" in x for x in list(self._dataframe.columns)] ): raise ValueError( "The parameter '{param}' is not contained in this dataset.".format( param=item ) ) params = Q_cols | params elif item == "stage": if not np.any( ["00065" in x for x in list(self._dataframe.columns)] ): raise ValueError( "The parameter '{param}' is not contained in this dataset.".format( param=item ) ) params = stage_cols | params elif item == "data": meta = data_cols | meta elif item == "flags": meta = flag_cols | meta elif, item): param_arg = ":" + item + ":" params = self._dataframe.columns.str.contains(param_arg) | params if not params.any(): raise ValueError( "The parameter '{param}' is not contained in this dataset.".format( param=item ) ) elif, item): station_arg = ":" + item + ":" sites = self._dataframe.columns.str.contains(station_arg) | sites if not sites.any(): raise ValueError( "The site '{site}' is not in this dataset.".format( site=item ) ) else: raise ValueError( "The argument '{item}' is not recognized.".format(item=item) ) if not sites.any(): # If no sites are selected, select them all. sites = all_cols if not params.any(): # If no params are selected, select them all. params = all_cols if ( not meta.any() ): # If neither flags nor data are selected, select data columns. meta = data_cols selection = sites & params & meta requested_df = self._dataframe.loc[:, selection] return requested_df
[docs] def get_data(self): """ .. deprecated:: version 0.2.0 No longer needed. NWIS object will request data upon creation. """ warnings.warn( "It is no longer necessary to call .get_data() to request data.", FutureWarning, ) return self
[docs] def save(self, file): """ Save the dataframe and metadata to a parquet file. Args: file (str): the filename to save to. """ extension = file.split(".")[-1] if extension == "parquet": hf.save_parquet(file, self._dataframe, self.meta) elif extension == "gz": try: hf.save_json_gzip(file, self.json) except AttributeError as err: print( "Hydrofunctions can only save NWIS objects using gzip if the NWIS" " object still has its original WaterML JSON. You might be able " "to fix this problem if you call NWIS using the 'file' parameter " "so that the JSON is saved immediately after the request is made." ) raise err else: raise OSError( f"The file type extension '.{extension}' in the file name {file} is " "not recognized by HydroFunctions. Try *.gz or *.parquet instead." ) return self
[docs] def read(self, file): """ Read from a zipped WaterML file '.json.gz' or from a parquet file. Args: file (str): the filename to read from. """ extension = file.split(".")[-1] if extension == "parquet": self._dataframe, self.meta = hf.read_parquet(file) elif extension == "gz": self.json = hf.read_json_gzip(file) self._dataframe, self.meta = hf.extract_nwis_df(self.json) else: raise OSError( f"The file type extension '.{extension}' in the file name {file} is not recognized by HydroFunctions." ) return self