#!/bin/env python3 helpText = """ import.py v1.2.0 - 9/19/2017 This program extracts pathnames from the NWDP CWMS database and writes them to a SQLite database (data/hydro.db) for public consumption. It expects the pathnames to be located in pathnames.list """ import sys sys.path.append("../webexec") sys.path.append("/home/usace/g0cwpa26/rwcds/dd/common/web_service/webexec") import atexit, argparse import datetime import hydro_lib import json import os import random import requests import time import wslib pisces_cwms_map = { "siteid": "LOCATION_ID", "description": "PUBLIC_NAME", "latitude": "LATITUDE", "longitude": "LONGITUDE", "elevation": "ELEVATION", "active_flag": "ACTIVE_FLAG", "horizontal_datum": "HORIZONTAL_DATUM", "vertical_datum": "VERTICAL_DATUM", "responsibility": "OFFICE_ID" } conf = { "lookback": 7, "lookback1": 7, "lookforward": 180, "lookback2": 30000, "chance": 150, "defaultUSCS": hydro_lib.defaultUnits, "sql_path": "../data/newdata.sql", "histpaths": "./histpaths.list", "pathnames_path": "../www/pathnames.txt", "pathnames_list": "./pathnames.list" } paths = [] #----------------------------------------------- # Methods to get metadata using metadata service #----------------------------------------------- def storeCwmsMeta(d): r = hydro_lib.rec( [], table="sitecatalog", keys=hydro_lib.schemas["sitecatalog"]) for key in pisces_cwms_map: r[key] = d[pisces_cwms_map[key]] logSQL(r.store(hydro_lib.cur)) hydro_lib.dbconn.commit() def getAllCwmsStations(): hostname = os.uname()[1] url = 'http://127.0.0.1/apps/metadata/webexec/meta.py?function=getOffice&id=ALL' r = requests.get(url) sites = json.loads(r.text) for key in sorted(sites.keys()): d = sites[key] if args.debug and d["LOCATION_ID"] == "BON": print d storeCwmsMeta(d) #------------------------------------- # Methods to extract public timeseries #------------------------------------- def logSQL(s): sf.write(s + "\n") return s def tsFromCwms(pathname, lag=0, u="default"): end = datetime.datetime.now() start = end - datetime.timedelta(days=int(conf["lookback"])) if lag != 0: end = end - datetime.timedelta(days=lag) start = start - datetime.timedelta(days=lag) print " Adding lag of %d days" % (lag) if "FCST" in pathname or "BPA-RAW" in pathname or "-PUB" in pathname: end += datetime.timedelta(days=int(conf["lookforward"])) d = ds.readTS(pathname, start, end, u) print " Read %d values, last = %s (@%s)" % (len(d), d[-1][1], str(d[-1][0])) return hydro_lib.timeSeries(d) # Takes a CWMS pathname and finds default units def getDefaultUnits(tsid): tsid = tsid.lower() tokens = tsid.split(".") # Check to see if full Parameter is in default units and return param = tokens[1] if param in conf["defaultUSCS"]: return conf["defaultUSCS"][param] # Check to see if parameter is in default units and return param = tokens[1].split("-")[0] if param in conf["defaultUSCS"]: return conf["defaultUSCS"][param] # Default to database default return "" ############################################################################### # reads a temporary list of historical paths to update, if it exists, then # removes it. Overrides chance historical update for just those time series. def getHistPaths(): output = [] try: with open( conf['histpaths'] ) as fh: for line in fh: output.append( line.strip() ) os.remove( conf['histpaths'] ) except Exception as e: print( e ) pass return output # this function reads the pathnames.list file. # it expects a format of pathname \t units \t lag [days] def getPublicPaths(): txt = open(conf["pathnames_list"], "r").read().split("\n") output = {} count = 0 for line in txt: try: if line[0] != "#": tokens = line.split("\t") if len(tokens) == 1: tokens.append("") if len(tokens) == 2: tokens.append("0") (pathname, units, lag, private) = tokens pathname = pathname.strip() if units == "" or units == "Default": units = getDefaultUnits(pathname) output[pathname] = {"UNITS": units, "LAG": int(lag), "PRIVATE": private} except: print "Error on line %d" % (count,) count += 1 return output def cullOldPaths(): print "Culling old paths" paths = getPublicPaths() if paths == []: return r = hydro_lib.rec( [], table="seriescatalog", keys=hydro_lib.schemas["seriescatalog"]) p = r.get_many(hydro_lib.cur, "name", "%") for key in p: if not key in paths: print " %s not found - trying to remove..." % (key), try: print logSQL(r.delete(hydro_lib.cur, "name", key)) sql = "DROP TABLE " + p[key]["tablename"] hydro_lib.cur.execute(sql) print logSQL(sql) hydro_lib.dbconn.commit() print logSQL("commit") except: print "ERROR" def shortTables(): print "shortening table names" r = hydro_lib.rec( [], table="seriescatalog", keys=hydro_lib.schemas["seriescatalog"]) p = r.get_many(hydro_lib.cur, "name", "%") for key in p: oldt = p[key]["tablename"] newt = oldt[:13] sql = "ALTER TABLE %s RENAME TO %s;" % (oldt, newt) try: hydro_lib.cur.execute(sql) logSQL(sql) except: print "error:" +sql p[key]["tablename"] = newt logSQL(p[key].store(hydro_lib.cur)) hydro_lib.dbconn.commit() def filtered(keylist): output = [] filter = args.filter.upper() if filter == "": return sorted(keylist) for key in sorted(keylist): if filter in key.upper(): output.append(key) return output def getTSextents (tsid): max = hydro_lib.max_datetime(tsid) min = hydro_lib.min_datetime(tsid) try: return "("+str(min.year)+"-"+str(max.year)+")" except: return "(Empty)" def storePublicPaths(): paths = getPublicPaths() print "Storing %d public paths" % (len(paths)) hist_paths = getHistPaths() print( "Including {:d} historical paths".format( len( hist_paths ) ) ) pathnames = [] replace_flag = False for key in filtered(paths.keys()): tsid = key parts = tsid.split(".") print "%s (%s):" % (tsid, paths[key]["UNITS"]) r = hydro_lib.rec( [], table="seriescatalog", keys=hydro_lib.schemas["seriescatalog"]) r = r.get(hydro_lib.cur, "name", tsid) r["name"] = tsid r["tablename"] = hydro_lib.makeTablename(tsid) r["siteid"] = parts[0] r["enabled"] = 1 r["units"] = paths[key]["UNITS"] r["parameter"] = parts[1] r["timeinterval"] = parts[2] r["timezone"] = "UTC" if random.randint(0, conf["chance"]) == 1 and args.historic: replace_flag = True conf["lookback"] = conf["lookback2"] elif key in hist_paths: replace_flag = True conf["lookback"] = conf["lookback2"] else: replace_flag = False conf["lookback"] = conf["lookback1"] if args.replace: print "Replacing table" replace_flag = True try: ts = tsFromCwms(r["name"], lag=paths[key]["LAG"], u=r["units"]) except Exception as e: print " Error getting %s from CWMS" % (r["name"]) print e ts = hydro_lib.timeSeries() if len(ts.data) > 0: newdata = hydro_lib.writeTS( r["tablename"], ts, replace_table=replace_flag) logSQL(newdata) print " Wrote %d days: %s" % (conf["lookback"], hydro_lib.status) pathnames.append(tsid + "\t" + r["units"] + "\n") else: print " NO DATA in %d-day lookback" % (conf["lookback"]) r["notes"] = getTSextents(tsid) if paths[key]["PRIVATE"] == "True": r["notes"] += " PRIVATE" print r["notes"] logSQL(r.store(hydro_lib.cur)) hydro_lib.dbconn.commit() # Write pathnames file f = open(conf["pathnames_path"], "w") for pathname in pathnames: f.write(pathname) f.close() ############################################################################### p = argparse.ArgumentParser(description=helpText) p.add_argument( '-l', '--lookback', help='set lookback in days', type=int, default=2) p.add_argument( '-f', '--filter', help='Keyword filter, useful for extracting specific data.', default = "") p.add_argument( '-sh', '--historic', help='Store historic record for a random subset of paths.', action="store_true") p.add_argument( '-st', '--shorttables', help='rename timeseries tables in database to short names.', action="store_true") p.add_argument( '-r', '--replace', help='Force replace on all timeseries tables.', action="store_true") p.add_argument( '-d', '--debug', help='Debug mode, verbose output', action="store_true") args = p.parse_args() conf["lookback"] = args.lookback conf["lookback1"] = args.lookback os.environ["TZ"] = "UTC" time.tzset() ds = wslib.ddWebService() config = wslib.ddWebServiceConfig() config.loadConfig("../config/config.json") ds.updateConfig(config) ds.connect() if ds.status != "OK": raise Exception(ds.status) atexit.register(ds.disconnect) #file where SQL is written to push to external sites sf = open(conf["sql_path"], "w") #When we upgrade to python 3.3+ use the following to log SQL:hydro_lib.dbconn.set_trace_callback(logSQL) print "hydrolib:" + hydro_lib.status print "Connection:" + ds.configuration["dbname"] print "Looking back %d days." % (conf["lookback"]) if args.shorttables: shortTables() sys.exit(0) getAllCwmsStations() cullOldPaths() storePublicPaths() #Close differences file sf.close()