"""
Hycom3d post-production
"""
import xarray as xr
import pandas as pd
import numpy as np
import xesmf
import os, shutil, re, logging
import multiprocessing as mp
from ...times import convert_to_julian_day
from ...interp import Regridder
FILTER_map = {
"mean":"1",
"demerliac":"2",
"godin":"3"
}
LOGGER = logging.getLogger(__name__)
[docs]def preproc_time_splitting(ds):
return ds.isel({"time":slice(None,-1)})
[docs]def postproc_time_splitting(ds):
for vname, var in ds.items():
if vname=="bathymetry":
if "time" in ds[vname].dims:
ds[vname] = ds[vname].isel(time=0)
return ds
[docs]def get_cmdargs_nc_time_splitting(ncins, fields, ncout="{field}_{date}.nc", freq="PT1H"):
"""Split a netcdf file into several netcdf files at a specific frequency"""
cmdargs_list = []
for field in fields:
for ncin in ncins:
if ncin.startswith(field):
if ("spinup" in ncin) or ("spnudge1" in ncin):
cmdargs = [ncin, freq, ncout.format(field=field, date="{date}"), preproc_time_splitting, postproc_time_splitting]
else:
cmdargs = [ncin, freq, ncout.format(field=field, date="{date}"), None, postproc_time_splitting]
cmdargs_list.append(cmdargs)
return cmdargs_list
[docs]def write_verticalgrid(depths, filename="verticalgrid.txt"):
"""Write the text file containing the depth for vertically interpolation"""
f = open(filename, "w+")
f.write("{}\n".format(int(len(depths))))
for d in depths:
f.write(f"{d}\n")
f.close()
return filename
[docs]def get_fields_from_cfg(cfg_var, dim=None):
"""Get the hycom3d variable names with respect to post-prod name"""
hycom3d_var = []
areas = cfg_var.keys()
areas.remove("common")
srcs = []
for area in areas:
src = cfg_var[area]["src"]
if src not in srcs:
srcs.append(src)
for src in srcs:
hycom3d_var.append(f"h_3D_{src}")
if (dim=="2D") or (dim==None):
if "none" not in cfg_var[area]["variables"]["2D"]:
if "ssh" in cfg_var[area]["variables"]["2D"]:
hycom3d_var.append(f"ssh_2D_{src}")
if "sst" in cfg_var[area]["variables"]["2D"]:
hycom3d_var.append(f"sst_2D_{src}")
if "sss" in cfg_var[area]["variables"]["2D"]:
hycom3d_var.append(f"sss_2D_{src}")
if "u" in cfg_var[area]["variables"]["2D"]:
hycom3d_var.append(f"u_2D_{src}")
if "v" in cfg_var[area]["variables"]["2D"]:
hycom3d_var.append(f"v_2D_{src}")
if (dim=="3D") or (dim==None):
if "none" not in cfg_var[area]["variables"]["3D"]:
for var in cfg_var[area]["variables"]["3D"]:
hycom3d_var.append(f"{var}_3D_{src}")
return hycom3d_var
[docs]def get_postprod_term(specs, filtering=None):
"""Get the list of post-production dates from configuration"""
bounds_term = specs["common"]["bounds_term"]
start = bounds_term["min"]
end = bounds_term["max"]
term_postprod = np.arange(start, end+24, 24)
if (filtering=="demerliac") or (filtering=="godin"):
return term_postprod[1:-2]
elif filtering=="mean":
return term_postprod[:-1]
else:
return term_postprod
[docs]def get_filter_clargs(ncins, specs, ncout_patt):
"""Get opts for filtering executable"""
terms_postprod = get_postprod_term(specs)
opts_list = []
srcs = []
areas = specs.keys()
areas.remove("common")
for area in areas:
src = specs[area]["src"]
if src not in srcs:
srcs.append(src)
for src in srcs:
for dim in ["3D", "2D"]:
for filter in specs[area]["filters"][dim]:
if not filter=="none":
fields = specs[area]["variables"][dim]
if dim=="3D":
fields.append("h")
elif dim=="2D":
if filter not in specs[area]["filters"]["3D"]:
fields.append("h")
for term_postprod in get_postprod_term(specs, filtering=filter):
term_before = term_postprod-24
term_after = term_postprod+24
for field in fields:
if field=="h":
filling = "3D"
else:
filling = dim
if not field=="none":
opts = {"filter": FILTER_map[filter],
"ncout": f"{field}_{filling}_{src}_{term_postprod:04d}_{filter}.nc" if term_postprod>=0 else f"{field}_{filling}_{src}_{term_postprod:05d}_{filter}.nc",
"ncins":{
"ncin_for": f"{field}_{filling}_{src}_{term_after:04d}.nc" if term_after>=0 else f"{field}_{filling}_{src}_{term_after:05d}.nc",
"ncin_mid": f"{field}_{filling}_{src}_{term_postprod:04d}.nc" if term_postprod>=0 else f"{field}_{filling}_{src}_{term_postprod:05d}.nc",
"ncin_back": f"{field}_{filling}_{src}_{term_before:04d}.nc" if term_before>=0 else f"{field}_{filling}_{src}_{term_before:05d}.nc"
}
}
if filter=="mean":
opts["ncins"].pop("ncin_back")
ncin_filter = [val for key, val in opts["ncins"].items() if key.startswith("ncin")]
if set(ncin_filter).issubset(ncins):
opts_list.append(opts)
return opts_list
[docs]def get_interpolation_clargs(ncins, offset, specs, config, verticalgrid):
"""Get command opts for vertical interpolation executable"""
opts_list = []
srcs = []
areas = specs.keys()
areas.remove("common")
for area in areas:
src = specs[area]["src"]
if src not in srcs:
srcs.append(src)
for src in srcs:
for dim in ["3D", "2D"]:
for filter in specs[area]["filters"][dim]:
fields = specs[area]["variables"][dim]
terms_postprod = get_postprod_term(specs, filtering=filter)
for term_postprod in terms_postprod:
prec = "04d" if term_postprod >= 0 else "05d"
for field in fields:
if not field=="none":
opts = {"offset": offset,
"ncins": {"ncin": f"{field}_{dim}_{src}_{term_postprod:{prec}}_{filter}.nc",
"h": f"h_3D_{src}_{term_postprod:{prec}}_{filter}.nc"
},
"ncout": f"{field}_{dim}_{src}_{term_postprod:{prec}}_{filter}_interp.nc",
"config": os.path.basename(config),
"zgrid": verticalgrid
}
if dim=="2D":
opts.pop("zgrid")
if set(opts["ncins"].values()).issubset(ncins):
opts_list.append(opts)
else:
LOGGER.warning(f"The following NetCDF files are missing: \n {opts['ncins']}")
return opts_list
[docs]def get_tempconversion_clargs(ncins, specs):
"""Get command line opts for the conversion of temperature"""
opts_list = []
srcs = []
areas = specs.keys()
areas.remove("common")
for area in areas:
src = specs[area]["src"]
if src not in srcs:
srcs.append(src)
for src in srcs:
for dim in ["3D", "2D"]:
for filter in specs[area]["filters"][dim]:
fields = specs[area]["variables"][dim]
if "temp" in fields:
temp = "temp"
saln = "saln"
elif "sst" in fields:
temp = "sst"
saln = "sss"
terms_postprod = get_postprod_term(specs, filtering=filter)
for term_postprod in terms_postprod:
prec = "04d" if term_postprod >= 0 else "05d"
opts = {"ncins": {"nctemp": f"{temp}_{dim}_{src}_{term_postprod:{prec}}_{filter}.nc",
"ncsaln": f"{saln}_{dim}_{src}_{term_postprod:{prec}}_{filter}.nc",
},
"ncout": f"tempis_{dim}_{src}_{term_postprod:{prec}}_{filter}.nc"
}
if set(opts["ncins"].values()).issubset(ncins):
opts_list.append(opts)
else:
LOGGER.warning(f"The following NetCDF files do not exist: \n {opts['ncins']}")
return opts_list
[docs]def get_concat_clargs(ncins, rundate, postprod,
vapp, vconf, specs):
"""Concatenation of daily files args"""
cmdargs_list = []
srcs = []
areas = specs.keys()
areas.remove("common")
for area in areas:
src = specs[area]["src"]
outputs = specs[area]["output"]
for output in outputs:
for dim in ["3D", "2D"]:
for filter in specs[area]["filters"][dim]:
fieldsin = specs[area]["variables"][dim]
if not fieldsin == "none":
if output == "all":
fields = fieldsin
else:
if (output == "hydro") and (dim == "2D"):
fs = ["sst", "sss"]
elif (output == "hydro") and (dim == "3D"):
fs = ["temp", "saln"]
elif output == "dyn":
fs = ["u", "v"]
if dim == "2D":
fs.append("ssh")
fields = [var for var in fs if var in fieldsin]
terms_postprod = get_postprod_term(specs, filtering=filter)
for term_postprod in terms_postprod:
prec = "04d" if term_postprod >= 0 else "05d"
cmdargs = {}
cmdargs["ncout"] = f"{output}_{dim}_{src}_{term_postprod:{prec}}_{filter}.nc"
cmdargs["ncins"] = []
for field in fields:
if (dim=="3D") and (field=="temp") and (
"datashom" in postprod
):
cmdargs["ncins"].append(
f"tempis_3D_{src}_{term_postprod:{prec}}_{filter}.nc"
)
else:
cmdargs["ncins"].append(
f"{field}_{dim}_{src}_{term_postprod:{prec}}_{filter}.nc"
)
if filter=="none":
field_type = "instantaneous"
else:
field_type = filter
date = pd.to_datetime(
rundate
)+pd.to_timedelta(int(term_postprod), "h")
if "forecast" in postprod:
if date==(
pd.to_datetime(rundate)-pd.to_timedelta(1, "D")
).strftime("%Y%m%d"):
forecast = "analysis"
forecast_range = f"1-days {forecast}"
forecast_type = f"{forecast}"
else:
forecast = "forecast"
forecast_range = f"5-days {forecast}"
forecast_type = f"{forecast}"
elif "spnudge" in postprod:
day0 = convert_to_julian_day(
np.datetime64(date)
)
day1 = convert_to_julian_day(
np.datetime64(rundate)-np.timedelta64(7, 'D')
)
if day0<day1:
forecast = "best analysis"
else:
forecast = "analysis"
forecast_range = f"7-days {forecast}"
forecast_type = f"{forecast}"
elif "hindcast" in postprod:
forecast = "best analysis"
forecast_range = f"7-days {forecast}"
forecast_type = f"{forecast}"
if filter == "mean":
glob_attrs_title = f"{vapp} - {dim} parameters - daily mean data"
elif (filter == "godin") or (filter == "demerliac"):
glob_attrs_title = f"{vapp} - {dim} parameters - {filter} filtered data"
else:
glob_attrs_title = f"{vapp} - {dim} parameters"
cmdargs["global_attrs"] = {
"title": glob_attrs_title,
"source": f"Sloop-{vapp} {vconf}",
"bulletin_date": pd.to_datetime(
rundate
).strftime("%Y-%m-%d %H:%M:%S"),
"field_date": date.strftime(
"%Y%m%d"
) if ("datashom" in postprod) else pd.to_datetime(
date
).strftime("%Y-%m-%d %H:%M:%S"),
"field_type": field_type,
"forecast_range": forecast_range,
"forecast_type": forecast_type,
}
if (set(cmdargs["ncins"]).issubset(ncins)
) and (len(cmdargs["ncins"])>0):
cmdargs_list.append([cmdargs])
else:
LOGGER.warning(
f"The following input NetCDF files do not exist: {cmdargs['ncins']}"
)
return cmdargs_list
[docs]def concat_field_ncfiles(args):
list_ds = []
for ncfile in args["ncins"]:
ds = xr.open_dataset(ncfile)
list_ds.append(ds)
ds = xr.merge(list_ds)
ds.encoding.update(list_ds[0].encoding)
ds.attrs.update(list_ds[0].attrs)
ds.attrs.update(args["global_attrs"])
ds.to_netcdf(args["ncout"])
[docs]def regrid_soap(ds):
"""Regrid dataset with constant latitude step"""
dsin = ds.assign_coords({"X":ds.lon[0,:],
"Y":ds.lat[:,0]})
dsin = dsin.rename({"lon":"lon_2d", "lat":"lat_2d",
"X":"lon", "Y":"lat"})
dlat = np.mean(dsin.lat.diff("lat"))
lat = np.arange(dsin.lat.min(), dsin.lat.max(), dlat,
type(ds.lat.values[0,0]))
soap_grid = xr.DataArray(dims=["lon", "lat"],
name="grid",
coords=dict(lat=lat,
lon=dsin.lon),
)
regridder = xesmf.Regridder(dsin, soap_grid,
reuse_weights=True,
method="bilinear")
dsout = []
for dain in dsin:
if not "time_bnds" in dain:
daout = regridder(dsin[dain])
else:
daout = dsin[dain]
daout.attrs.update(dsin[dain].attrs)
daout.encoding.update(dsin[dain].encoding)
dsout.append(daout)
dsout = xr.merge(dsout)
dsout = dsout.rename({"lon":"X", "lat":"Y"})
lon, lat = np.meshgrid(dsout.X, dsout.Y)
da_lon = xr.DataArray(lon,
dims=["Y", "X"],
name="lon",
)
da_lat = xr.DataArray(lat,
dims=["Y", "X"],
name="lat",
)
dsout = xr.merge([dsout, da_lon, da_lat])
for coord in dsout.coords:
dsout[coord].attrs.update(ds[coord].attrs)
dsout.encoding.update(ds.encoding)
dsout.attrs.update(ds.attrs)
return dsout
[docs]def get_vtx_geometries(cmdargs):
"""Get the post-production geometries."""
geometries = dict()
for cmdarg in cmdargs:
name = cmdarg[0]["name"]
geometries[name] = dict(
kind = "lonlat",
info = "Shom Hycom3d postproduction area",
area = name,
)
return geometries