"""
Rivers processing
"""
import tarfile
import re
import logging
from ...times import convert_to_julian_day
from ...interp import interp_time
from .__init__ import (HYCOM3D_RIVERS_CFG,
HYCOM3D_RIVERS_INI,
HYCOM3D_RIVERS_FLOW_CLIM,
HYCOM3D_RIVERS_HYDRO_CLIM)
from ...io import read_inicfg_files
import numpy as np
from ...log import LoggingLevelContext
with LoggingLevelContext(logging.WARNING):
import xarray as xr
import pandas as pd
LOGGER = logging.getLogger(__name__)
[docs]class Rivers(object):
""" Rivers prepocessings """
[docs] def __init__(
self, cfgfile=HYCOM3D_RIVERS_CFG.format(rank=0)):
self.rivers = read_inicfg_files(
fcfg=cfgfile, fini=HYCOM3D_RIVERS_INI
)
self.platforms = dict()
for river in self.rivers.keys():
self.rivers[river].update({"dataset": None})
if not self.rivers[river]["clim"]:
LOGGER.info(f"{river}: real time data used")
for platform in self.rivers[river]["platform"]["Id"]:
self.platforms[platform] = {"path": [], "dataset": None}
[docs] def write_flowrate(self, tarname, time, nc_out):
self.rivers, self.platforms = self.platforms_nc2datasets(tarname)
for river in self.rivers.keys():
LOGGER.info(river)
if self.rivers[river]["clim"]:
cond = False
else:
platforms = self.rivers[river]["platform"]["Id"]
LOGGER.info(platforms)
dss = [type(self.platforms[p]["dataset"]) for p in platforms]
LOGGER.info(dss)
if type(None) not in dss:
cond = True
else:
cond = False
if cond:
LOGGER.info("Real time data")
for platform in self.rivers[river]["platform"]["Id"]:
LOGGER.info(f"{platform} time interpolation")
ds = interp_time(
self.platforms[platform]["dataset"].load(), time,
keep_flag=True, persistency="mean_25h"
)
ds.update(
{
"RVFL": ds["RVFL"]
* self.rivers[river]["platform"][platform]["debcoef"]
}
)
ds.update(
{
"flag": ds["flag"]
/ len(self.rivers[river]["platform"]["Id"])
}
)
if self.rivers[river]["dataset"]:
for var in ["RVFL", "flag"]:
self.rivers[river]["dataset"][var] += ds[var]
else:
self.rivers[river]["dataset"] = ds
else:
LOGGER.info("Climatology data")
LOGGER.info(f"Read {HYCOM3D_RIVERS_FLOW_CLIM} ascii file")
f = open(HYCOM3D_RIVERS_FLOW_CLIM, mode="r", encoding="ascii")
while True:
line = f.readline()
if river in line:
LOGGER.debug(line)
break
rvfl = np.tile(np.float_(f.readline().split()), 3)
f.close()
LOGGER.info("Get climatology time")
time_fr = [pd.to_datetime(t) for t in self.get_time_clim(time, len(rvfl))]
LOGGER.debug(time_fr)
da = xr.DataArray(rvfl,
name="RVFL",
coords={
"time":time_fr},
dims='time').to_dataset()
LOGGER.info("Time interpolation")
ds = interp_time(
da, time, keep_flag=True
)
self.rivers[river]["dataset"] = ds
for river in self.rivers.keys():
LOGGER.info("{river}: save in NetCDF file")
self.rivers[river]["dataset"][["RVFL", "flag"]].to_netcdf(
nc_out.format(**locals())
)
[docs] def write_tempsaln(self, nc_in, nc_out):
const = 2.0 * np.pi / 365.0
for river in self.rivers.keys():
LOGGER.info(f"{river}:")
ds_river = xr.open_dataset(nc_in.format(**locals()))
if not self.rivers[river]["clim"]:
LOGGER.info("Real time data")
for var in ["temperature", "salinity"]:
tmax = xr.DataArray(
pd.to_datetime(
str(ds_river.time.values[0])[:4]
+ self.rivers[river][var]["datemax"]
),
name="tmax",
)
DeltaT = (ds_river["time"]-tmax)/pd.to_timedelta(1, "D")
DeltaT = DeltaT*const
VAR = self.rivers[river][var]["avg"]+self.rivers[river][var][
"amp"
] * np.cos(DeltaT)
xa = xr.DataArray(
VAR, coords=[ds_river.time], dims=["time"], name=var
)
ds_river = xr.merge([ds_river, xa])
else:
LOGGER.info("Climatology data")
LOGGER.info(f"Open {HYCOM3D_RIVERS_HYDRO_CLIM} ascii file")
f = open(HYCOM3D_RIVERS_HYDRO_CLIM, mode="r", encoding="ascii")
while True:
line = f.readline()
if river in line:
LOGGER.debug(line)
break
temp = np.tile(np.float_(line.split()[2:]), 3)
saln = np.tile(np.float_(f.readline().split()[2:]), 3)
f.close()
time_fr = [pd.to_datetime(t) for t in self.get_time_clim(ds_river.time.values, len(temp))]
da_temp = xr.DataArray(temp,
name="temperature",
coords={
"time":time_fr},
dims='time').to_dataset()
da_saln = xr.DataArray(saln,
name="salinity",
coords={
"time":time_fr},
dims='time').to_dataset()
LOGGER.debug("Time interpolation")
ds = interp_time(
xr.merge([da_temp, da_saln]), ds_river.time.values, keep_flag=True
)
ds_river = xr.merge([ds_river, ds])
LOGGER.info(f"{river} : save in NetCDF file")
ds_river.to_netcdf(nc_out.format(**locals()))
[docs] def write_rfiles(self, nc_in="{river}.flx.ts.nc", r_out="{river}.r"):
header_pattern = (
"Debits horaires du fleuve {river} \n"
"origine : Donnees CMEMS \n\n"
'"Date" '
'"Debit" "temp." "sal." "flag" \n'
'"jour depuis 1/1/1950" "m3/s" "DegC" '
'"1e-3" "1=data, 2=interp, 3=persistance" \n'
"--------------------------------------------"
"--------------------------------------- \n"
)
line_pattern = (
" {julianday} {debit} {temp} {saln} {flag}\n"
)
valformat = "{:.10f}"
for river in self.rivers.keys():
ds_river = xr.open_dataset(nc_in.format(**locals()))
with open(r_out.format(**locals()), "w") as f:
f.writelines(header_pattern.format(**locals()))
for itime in range(len(ds_river.time)):
dsi = ds_river.isel({"time": itime})
julianday = valformat.format(
convert_to_julian_day(dsi.time.values)
)
debit = valformat.format(dsi.RVFL.values)
temp = valformat.format(dsi.temperature.values)
saln = valformat.format(dsi.salinity.values)
flag = int(dsi.flag.values)
f.write(line_pattern.format(**locals()))
[docs] def write_ncfiles(self, nc_in="{river}.flx.ts.nc", nc_out="{river}.nc"):
for river in self.rivers.keys():
ds_river = xr.open_dataset(nc_in.format(**locals()))
ds_river.to_netcdf(nc_out.format(**locals()))
[docs] def get_time_clim(self, time, ntime):
time_fr = ["{}0115".format(
(pd.to_datetime(time[0])-pd.DateOffset(years=1)).strftime("%Y"))]
itime = 0
while itime<(ntime-1):
t = "{}15".format(
(pd.to_datetime(time_fr[itime])+pd.DateOffset(months=1)).strftime("%Y%m"))
time_fr.append(t)
itime += 1
return time_fr