Source code for sloop.models.hycom3d.rivers

"""
Rivers processing
"""
import tarfile
import re
import logging

from ...times import convert_to_julian_day
from ...interp import interp_time
from .__init__ import (HYCOM3D_RIVERS_CFG, 
                       HYCOM3D_RIVERS_INI,
                       HYCOM3D_RIVERS_FLOW_CLIM,
                       HYCOM3D_RIVERS_HYDRO_CLIM)
from ...io import read_inicfg_files

import numpy as np
from ...log import LoggingLevelContext

with LoggingLevelContext(logging.WARNING):
    import xarray as xr
    import pandas as pd


LOGGER = logging.getLogger(__name__)

[docs]class Rivers(object): """ Rivers prepocessings """
[docs] def __init__( self, cfgfile=HYCOM3D_RIVERS_CFG.format(rank=0)): self.rivers = read_inicfg_files( fcfg=cfgfile, fini=HYCOM3D_RIVERS_INI ) self.platforms = dict() for river in self.rivers.keys(): self.rivers[river].update({"dataset": None}) if not self.rivers[river]["clim"]: LOGGER.info(f"{river}: real time data used") for platform in self.rivers[river]["platform"]["Id"]: self.platforms[platform] = {"path": [], "dataset": None}
[docs] def platforms_selection(self, tarname): nc_pattern = r"GL_TS_RF_(?P<platform>.*)_(?P<date>.*).nc" tmp_tar = tarfile.open(tarname, mode="r:gz") for member in tmp_tar.getnames(): if any(platform in member for platform in self.platforms.keys()): matches = re.search(nc_pattern, member, re.DOTALL) tmp_tar.extract(member) LOGGER.info(f"{matches['platform']}: {member}") self.platforms[matches["platform"]]["path"].append(member) tmp_tar.close() return self.platforms
[docs] def platforms_nc2dataset_preprocess(self, ds): ds["LONGITUDE"] = ds.LONGITUDE.swap_dims({"LONGITUDE": "TIME"}) ds["LATITUDE"] = ds.LATITUDE.swap_dims({"LATITUDE": "TIME"}) ds = ds['RVFL'].squeeze(dim="DEPTH", drop=True) ds = ds.rename({"TIME": "time", "LONGITUDE": "lon", "LATITUDE": "lat"}) ds = ds.reset_coords(["lon", "lat"]) return ds
[docs] def platforms_nc2datasets(self, tarname): LOGGER.info("NetCDF to dataset for: ") self.platforms = self.platforms_selection(tarname) for platform in self.platforms.keys(): LOGGER.info(platform) ncins = self.platforms[platform]["path"][:] if len(ncins) > 0 : dataset = xr.open_mfdataset( ncins, preprocess=self.platforms_nc2dataset_preprocess, concat_dim=["time"], combine="by_coords", ) self.platforms[platform]["dataset"] = dataset else: self.platforms[platform]["dataset"] = None return self.rivers, self.platforms
[docs] def write_flowrate(self, tarname, time, nc_out): self.rivers, self.platforms = self.platforms_nc2datasets(tarname) for river in self.rivers.keys(): LOGGER.info(river) if self.rivers[river]["clim"]: cond = False else: platforms = self.rivers[river]["platform"]["Id"] LOGGER.info(platforms) dss = [type(self.platforms[p]["dataset"]) for p in platforms] LOGGER.info(dss) if type(None) not in dss: cond = True else: cond = False if cond: LOGGER.info("Real time data") for platform in self.rivers[river]["platform"]["Id"]: LOGGER.info(f"{platform} time interpolation") ds = interp_time( self.platforms[platform]["dataset"].load(), time, keep_flag=True, persistency="mean_25h" ) ds.update( { "RVFL": ds["RVFL"] * self.rivers[river]["platform"][platform]["debcoef"] } ) ds.update( { "flag": ds["flag"] / len(self.rivers[river]["platform"]["Id"]) } ) if self.rivers[river]["dataset"]: for var in ["RVFL", "flag"]: self.rivers[river]["dataset"][var] += ds[var] else: self.rivers[river]["dataset"] = ds else: LOGGER.info("Climatology data") LOGGER.info(f"Read {HYCOM3D_RIVERS_FLOW_CLIM} ascii file") f = open(HYCOM3D_RIVERS_FLOW_CLIM, mode="r", encoding="ascii") while True: line = f.readline() if river in line: LOGGER.debug(line) break rvfl = np.tile(np.float_(f.readline().split()), 3) f.close() LOGGER.info("Get climatology time") time_fr = [pd.to_datetime(t) for t in self.get_time_clim(time, len(rvfl))] LOGGER.debug(time_fr) da = xr.DataArray(rvfl, name="RVFL", coords={ "time":time_fr}, dims='time').to_dataset() LOGGER.info("Time interpolation") ds = interp_time( da, time, keep_flag=True ) self.rivers[river]["dataset"] = ds for river in self.rivers.keys(): LOGGER.info("{river}: save in NetCDF file") self.rivers[river]["dataset"][["RVFL", "flag"]].to_netcdf( nc_out.format(**locals()) )
[docs] def write_tempsaln(self, nc_in, nc_out): const = 2.0 * np.pi / 365.0 for river in self.rivers.keys(): LOGGER.info(f"{river}:") ds_river = xr.open_dataset(nc_in.format(**locals())) if not self.rivers[river]["clim"]: LOGGER.info("Real time data") for var in ["temperature", "salinity"]: tmax = xr.DataArray( pd.to_datetime( str(ds_river.time.values[0])[:4] + self.rivers[river][var]["datemax"] ), name="tmax", ) DeltaT = (ds_river["time"]-tmax)/pd.to_timedelta(1, "D") DeltaT = DeltaT*const VAR = self.rivers[river][var]["avg"]+self.rivers[river][var][ "amp" ] * np.cos(DeltaT) xa = xr.DataArray( VAR, coords=[ds_river.time], dims=["time"], name=var ) ds_river = xr.merge([ds_river, xa]) else: LOGGER.info("Climatology data") LOGGER.info(f"Open {HYCOM3D_RIVERS_HYDRO_CLIM} ascii file") f = open(HYCOM3D_RIVERS_HYDRO_CLIM, mode="r", encoding="ascii") while True: line = f.readline() if river in line: LOGGER.debug(line) break temp = np.tile(np.float_(line.split()[2:]), 3) saln = np.tile(np.float_(f.readline().split()[2:]), 3) f.close() time_fr = [pd.to_datetime(t) for t in self.get_time_clim(ds_river.time.values, len(temp))] da_temp = xr.DataArray(temp, name="temperature", coords={ "time":time_fr}, dims='time').to_dataset() da_saln = xr.DataArray(saln, name="salinity", coords={ "time":time_fr}, dims='time').to_dataset() LOGGER.debug("Time interpolation") ds = interp_time( xr.merge([da_temp, da_saln]), ds_river.time.values, keep_flag=True ) ds_river = xr.merge([ds_river, ds]) LOGGER.info(f"{river} : save in NetCDF file") ds_river.to_netcdf(nc_out.format(**locals()))
[docs] def write_rfiles(self, nc_in="{river}.flx.ts.nc", r_out="{river}.r"): header_pattern = ( "Debits horaires du fleuve {river} \n" "origine : Donnees CMEMS \n\n" '"Date" ' '"Debit" "temp." "sal." "flag" \n' '"jour depuis 1/1/1950" "m3/s" "DegC" ' '"1e-3" "1=data, 2=interp, 3=persistance" \n' "--------------------------------------------" "--------------------------------------- \n" ) line_pattern = ( " {julianday} {debit} {temp} {saln} {flag}\n" ) valformat = "{:.10f}" for river in self.rivers.keys(): ds_river = xr.open_dataset(nc_in.format(**locals())) with open(r_out.format(**locals()), "w") as f: f.writelines(header_pattern.format(**locals())) for itime in range(len(ds_river.time)): dsi = ds_river.isel({"time": itime}) julianday = valformat.format( convert_to_julian_day(dsi.time.values) ) debit = valformat.format(dsi.RVFL.values) temp = valformat.format(dsi.temperature.values) saln = valformat.format(dsi.salinity.values) flag = int(dsi.flag.values) f.write(line_pattern.format(**locals()))
[docs] def write_ncfiles(self, nc_in="{river}.flx.ts.nc", nc_out="{river}.nc"): for river in self.rivers.keys(): ds_river = xr.open_dataset(nc_in.format(**locals())) ds_river.to_netcdf(nc_out.format(**locals()))
[docs] def get_time_clim(self, time, ntime): time_fr = ["{}0115".format( (pd.to_datetime(time[0])-pd.DateOffset(years=1)).strftime("%Y"))] itime = 0 while itime<(ntime-1): t = "{}15".format( (pd.to_datetime(time_fr[itime])+pd.DateOffset(months=1)).strftime("%Y%m")) time_fr.append(t) itime += 1 return time_fr