Source code for sloop.models.hycom3d.postprod

"""
Hycom3d post-production
"""
import xarray as xr
import pandas as pd
import numpy as np
import xesmf
import os, shutil, re, logging
import multiprocessing as mp

from ...times import convert_to_julian_day
from ...interp import Regridder


FILTER_map = {
    "mean":"1",
    "demerliac":"2",
    "godin":"3"
    }

LOGGER = logging.getLogger(__name__)


[docs]def preproc_time_splitting(ds): return ds.isel({"time":slice(None,-1)})
[docs]def postproc_time_splitting(ds): for vname, var in ds.items(): if vname=="bathymetry": if "time" in ds[vname].dims: ds[vname] = ds[vname].isel(time=0) return ds
[docs]def get_cmdargs_nc_time_splitting(ncins, fields, ncout="{field}_{date}.nc", freq="PT1H"): """Split a netcdf file into several netcdf files at a specific frequency""" cmdargs_list = [] for field in fields: for ncin in ncins: if ncin.startswith(field): if ("spinup" in ncin) or ("spnudge1" in ncin): cmdargs = [ncin, freq, ncout.format(field=field, date="{date}"), preproc_time_splitting, postproc_time_splitting] else: cmdargs = [ncin, freq, ncout.format(field=field, date="{date}"), None, postproc_time_splitting] cmdargs_list.append(cmdargs) return cmdargs_list
[docs]def write_verticalgrid(depths, filename="verticalgrid.txt"): """Write the text file containing the depth for vertically interpolation""" f = open(filename, "w+") f.write("{}\n".format(int(len(depths)))) for d in depths: f.write(f"{d}\n") f.close() return filename
[docs]def get_fields_from_cfg(cfg_var, dim=None): """Get the hycom3d variable names with respect to post-prod name""" hycom3d_var = [] areas = cfg_var.keys() areas.remove("common") srcs = [] for area in areas: src = cfg_var[area]["src"] if src not in srcs: srcs.append(src) for src in srcs: hycom3d_var.append(f"h_3D_{src}") if (dim=="2D") or (dim==None): if "none" not in cfg_var[area]["variables"]["2D"]: if "ssh" in cfg_var[area]["variables"]["2D"]: hycom3d_var.append(f"ssh_2D_{src}") if "sst" in cfg_var[area]["variables"]["2D"]: hycom3d_var.append(f"sst_2D_{src}") if "sss" in cfg_var[area]["variables"]["2D"]: hycom3d_var.append(f"sss_2D_{src}") if "u" in cfg_var[area]["variables"]["2D"]: hycom3d_var.append(f"u_2D_{src}") if "v" in cfg_var[area]["variables"]["2D"]: hycom3d_var.append(f"v_2D_{src}") if (dim=="3D") or (dim==None): if "none" not in cfg_var[area]["variables"]["3D"]: for var in cfg_var[area]["variables"]["3D"]: hycom3d_var.append(f"{var}_3D_{src}") return hycom3d_var
[docs]def get_postprod_term(specs, filtering=None): """Get the list of post-production dates from configuration""" bounds_term = specs["common"]["bounds_term"] start = bounds_term["min"] end = bounds_term["max"] term_postprod = np.arange(start, end+24, 24) if (filtering=="demerliac") or (filtering=="godin"): return term_postprod[1:-2] elif filtering=="mean": return term_postprod[:-1] else: return term_postprod
[docs]def get_filter_clargs(ncins, specs, ncout_patt): """Get opts for filtering executable""" terms_postprod = get_postprod_term(specs) opts_list = [] srcs = [] areas = specs.keys() areas.remove("common") for area in areas: src = specs[area]["src"] if src not in srcs: srcs.append(src) for src in srcs: for dim in ["3D", "2D"]: for filter in specs[area]["filters"][dim]: if not filter=="none": fields = specs[area]["variables"][dim] if dim=="3D": fields.append("h") elif dim=="2D": if filter not in specs[area]["filters"]["3D"]: fields.append("h") for term_postprod in get_postprod_term(specs, filtering=filter): term_before = term_postprod-24 term_after = term_postprod+24 for field in fields: if field=="h": filling = "3D" else: filling = dim if not field=="none": opts = {"filter": FILTER_map[filter], "ncout": f"{field}_{filling}_{src}_{term_postprod:04d}_{filter}.nc" if term_postprod>=0 else f"{field}_{filling}_{src}_{term_postprod:05d}_{filter}.nc", "ncins":{ "ncin_for": f"{field}_{filling}_{src}_{term_after:04d}.nc" if term_after>=0 else f"{field}_{filling}_{src}_{term_after:05d}.nc", "ncin_mid": f"{field}_{filling}_{src}_{term_postprod:04d}.nc" if term_postprod>=0 else f"{field}_{filling}_{src}_{term_postprod:05d}.nc", "ncin_back": f"{field}_{filling}_{src}_{term_before:04d}.nc" if term_before>=0 else f"{field}_{filling}_{src}_{term_before:05d}.nc" } } if filter=="mean": opts["ncins"].pop("ncin_back") ncin_filter = [val for key, val in opts["ncins"].items() if key.startswith("ncin")] if set(ncin_filter).issubset(ncins): opts_list.append(opts) return opts_list
[docs]def get_interpolation_clargs(ncins, offset, specs, config, verticalgrid): """Get command opts for vertical interpolation executable""" opts_list = [] srcs = [] areas = specs.keys() areas.remove("common") for area in areas: src = specs[area]["src"] if src not in srcs: srcs.append(src) for src in srcs: for dim in ["3D", "2D"]: for filter in specs[area]["filters"][dim]: fields = specs[area]["variables"][dim] terms_postprod = get_postprod_term(specs, filtering=filter) for term_postprod in terms_postprod: prec = "04d" if term_postprod >= 0 else "05d" for field in fields: if not field=="none": opts = {"offset": offset, "ncins": {"ncin": f"{field}_{dim}_{src}_{term_postprod:{prec}}_{filter}.nc", "h": f"h_3D_{src}_{term_postprod:{prec}}_{filter}.nc" }, "ncout": f"{field}_{dim}_{src}_{term_postprod:{prec}}_{filter}_interp.nc", "config": os.path.basename(config), "zgrid": verticalgrid } if dim=="2D": opts.pop("zgrid") if set(opts["ncins"].values()).issubset(ncins): opts_list.append(opts) else: LOGGER.warning(f"The following NetCDF files are missing: \n {opts['ncins']}") return opts_list
[docs]def get_tempconversion_clargs(ncins, specs): """Get command line opts for the conversion of temperature""" opts_list = [] srcs = [] areas = specs.keys() areas.remove("common") for area in areas: src = specs[area]["src"] if src not in srcs: srcs.append(src) for src in srcs: for dim in ["3D", "2D"]: for filter in specs[area]["filters"][dim]: fields = specs[area]["variables"][dim] if "temp" in fields: temp = "temp" saln = "saln" elif "sst" in fields: temp = "sst" saln = "sss" terms_postprod = get_postprod_term(specs, filtering=filter) for term_postprod in terms_postprod: prec = "04d" if term_postprod >= 0 else "05d" opts = {"ncins": {"nctemp": f"{temp}_{dim}_{src}_{term_postprod:{prec}}_{filter}.nc", "ncsaln": f"{saln}_{dim}_{src}_{term_postprod:{prec}}_{filter}.nc", }, "ncout": f"tempis_{dim}_{src}_{term_postprod:{prec}}_{filter}.nc" } if set(opts["ncins"].values()).issubset(ncins): opts_list.append(opts) else: LOGGER.warning(f"The following NetCDF files do not exist: \n {opts['ncins']}") return opts_list
[docs]def get_concat_clargs(ncins, rundate, postprod, vapp, vconf, specs): """Concatenation of daily files args""" cmdargs_list = [] srcs = [] areas = specs.keys() areas.remove("common") for area in areas: src = specs[area]["src"] outputs = specs[area]["output"] for output in outputs: for dim in ["3D", "2D"]: for filter in specs[area]["filters"][dim]: fieldsin = specs[area]["variables"][dim] if not fieldsin == "none": if output == "all": fields = fieldsin else: if (output == "hydro") and (dim == "2D"): fs = ["sst", "sss"] elif (output == "hydro") and (dim == "3D"): fs = ["temp", "saln"] elif output == "dyn": fs = ["u", "v"] if dim == "2D": fs.append("ssh") fields = [var for var in fs if var in fieldsin] terms_postprod = get_postprod_term(specs, filtering=filter) for term_postprod in terms_postprod: prec = "04d" if term_postprod >= 0 else "05d" cmdargs = {} cmdargs["ncout"] = f"{output}_{dim}_{src}_{term_postprod:{prec}}_{filter}.nc" cmdargs["ncins"] = [] for field in fields: if (dim=="3D") and (field=="temp") and ( "datashom" in postprod ): cmdargs["ncins"].append( f"tempis_3D_{src}_{term_postprod:{prec}}_{filter}.nc" ) else: cmdargs["ncins"].append( f"{field}_{dim}_{src}_{term_postprod:{prec}}_{filter}.nc" ) if filter=="none": field_type = "instantaneous" else: field_type = filter date = pd.to_datetime( rundate )+pd.to_timedelta(int(term_postprod), "h") if "forecast" in postprod: if date==( pd.to_datetime(rundate)-pd.to_timedelta(1, "D") ).strftime("%Y%m%d"): forecast = "analysis" forecast_range = f"1-days {forecast}" forecast_type = f"{forecast}" else: forecast = "forecast" forecast_range = f"5-days {forecast}" forecast_type = f"{forecast}" elif "spnudge" in postprod: day0 = convert_to_julian_day( np.datetime64(date) ) day1 = convert_to_julian_day( np.datetime64(rundate)-np.timedelta64(7, 'D') ) if day0<day1: forecast = "best analysis" else: forecast = "analysis" forecast_range = f"7-days {forecast}" forecast_type = f"{forecast}" elif "hindcast" in postprod: forecast = "best analysis" forecast_range = f"7-days {forecast}" forecast_type = f"{forecast}" if filter == "mean": glob_attrs_title = f"{vapp} - {dim} parameters - daily mean data" elif (filter == "godin") or (filter == "demerliac"): glob_attrs_title = f"{vapp} - {dim} parameters - {filter} filtered data" else: glob_attrs_title = f"{vapp} - {dim} parameters" cmdargs["global_attrs"] = { "title": glob_attrs_title, "source": f"Sloop-{vapp} {vconf}", "bulletin_date": pd.to_datetime( rundate ).strftime("%Y-%m-%d %H:%M:%S"), "field_date": date.strftime( "%Y%m%d" ) if ("datashom" in postprod) else pd.to_datetime( date ).strftime("%Y-%m-%d %H:%M:%S"), "field_type": field_type, "forecast_range": forecast_range, "forecast_type": forecast_type, } if (set(cmdargs["ncins"]).issubset(ncins) ) and (len(cmdargs["ncins"])>0): cmdargs_list.append([cmdargs]) else: LOGGER.warning( f"The following input NetCDF files do not exist: {cmdargs['ncins']}" ) return cmdargs_list
[docs]def concat_field_ncfiles(args): list_ds = [] for ncfile in args["ncins"]: ds = xr.open_dataset(ncfile) list_ds.append(ds) ds = xr.merge(list_ds) ds.encoding.update(list_ds[0].encoding) ds.attrs.update(list_ds[0].attrs) ds.attrs.update(args["global_attrs"]) ds.to_netcdf(args["ncout"])
[docs]def get_extract_cmdargs(ncins, specs, postprod): """Area extractions args""" cmdargs_list = [] areas = specs.keys() areas.remove("common") for area in areas: src = specs[area]["src"] outputs = specs[area]["output"] for output in outputs: for dim in ["3D", "2D"]: for filter in specs[area]["filters"][dim]: terms_postprod = get_postprod_term(specs, filtering=filter) for term_postprod in terms_postprod: prec = "04d" if term_postprod >= 0 else "05d" cmdargs = {} cmdargs["name"] = area cmdargs["ncin"] = f"{output}_{dim}_{src}_{term_postprod:{prec}}_{filter}.nc" cmdargs["specs"] = specs[area] cmdargs["postprod"] = postprod if cmdargs["ncin"] in ncins: cmdargs_list.append([cmdargs]) return cmdargs_list
[docs]def regrid_soap(ds): """Regrid dataset with constant latitude step""" dsin = ds.assign_coords({"X":ds.lon[0,:], "Y":ds.lat[:,0]}) dsin = dsin.rename({"lon":"lon_2d", "lat":"lat_2d", "X":"lon", "Y":"lat"}) dlat = np.mean(dsin.lat.diff("lat")) lat = np.arange(dsin.lat.min(), dsin.lat.max(), dlat, type(ds.lat.values[0,0])) soap_grid = xr.DataArray(dims=["lon", "lat"], name="grid", coords=dict(lat=lat, lon=dsin.lon), ) regridder = xesmf.Regridder(dsin, soap_grid, reuse_weights=True, method="bilinear") dsout = [] for dain in dsin: if not "time_bnds" in dain: daout = regridder(dsin[dain]) else: daout = dsin[dain] daout.attrs.update(dsin[dain].attrs) daout.encoding.update(dsin[dain].encoding) dsout.append(daout) dsout = xr.merge(dsout) dsout = dsout.rename({"lon":"X", "lat":"Y"}) lon, lat = np.meshgrid(dsout.X, dsout.Y) da_lon = xr.DataArray(lon, dims=["Y", "X"], name="lon", ) da_lat = xr.DataArray(lat, dims=["Y", "X"], name="lat", ) dsout = xr.merge([dsout, da_lon, da_lat]) for coord in dsout.coords: dsout[coord].attrs.update(ds[coord].attrs) dsout.encoding.update(ds.encoding) dsout.attrs.update(ds.attrs) return dsout
[docs]def extract_metazone(args): """Metazone extractions""" cwd = os.getcwd() worker_dir = args["name"]+"_"+args["ncin"][:-3] os.makedirs(worker_dir) shutil.copy(args["ncin"], worker_dir+"/") os.chdir(worker_dir) ncin_re = "(?P<var>.*)_(?P<dim>.*)_(?P<src>.*)_(?P<term>.*)_(?P<filter>.*).nc" matches = re.search(ncin_re, args["ncin"], re.DOTALL) dsin = xr.open_dataset(args["ncin"]) ds = dsin.assign_coords({"X":dsin.lon[0,:], "Y":dsin.lat[:,0]}) if "soap" in args["postprod"]: ds = regrid_soap(ds) if "mean" in args["ncin"]: ds["time"] = ds["time"].dt.floor("1d") for var in ["u", "v"]: if("u" or "v") in ds: ds[var].attrs["units"] = "m s-1" else: for var in ["u", "v"]: if("u" or "v") in ds: ds[var].attrs["units"] = "m/s" encoding = {} for coord in ["depth", "time", "lon", "lat"]: if coord in ds: encoding.update({coord: {"_FillValue": None}}) ds[coord].attrs.update(dsin[coord].attrs) ds["time"] = [(time - pd.to_datetime("1950-01-01")) / pd.to_timedelta("1d") for time in ds["time"].values] ds["time"].attrs.update(dsin["time"].attrs) ds["time"].attrs.update({"units": "days since 1950-01-01 00:00:00"}) if "time_bnds" in ds: ds["time_bnds"] = ds["time_bnds"].astype(np.double) encoding.update({"time_bnds": {"_FillValue": None}}) if not ((args["specs"]["bounds_lon"]["min"]==9999.9) or (args["specs"]["bounds_lon"]["max"]==9999.9) or (args["specs"]["bounds_lat"]["min"]==9999.9) or (args["specs"]["bounds_lat"]["max"]==9999.9)): dso = ds.sel(X=slice(args["specs"]["bounds_lon"]["min"], args["specs"]["bounds_lon"]["max"]), Y=slice(args["specs"]["bounds_lat"]["min"], args["specs"]["bounds_lat"]["max"])) else: dso = ds dso = dso.drop(labels=['X', 'Y']) dso.attrs.update({ "title": dso.attrs["title"]+" zone {}".format(args["name"]), "geospatial_lat_min": float(dso.lat.min().round(decimals=2)), "geospatial_lat_max": float(dso.lat.max().round(decimals=2)), "geospatial_lon_min": float(dso.lon.min().round(decimals=2)), "geospatial_lon_max": float(dso.lon.max().round(decimals=2)), }) ncout = f"{matches['var']}_{matches['dim']}_{matches['term']}_{matches['filter']}_{args['name']}.nc" dso.to_netcdf(ncout, encoding=encoding) shutil.move(ncout, "../") os.chdir(cwd)
[docs]def get_vtx_geometries(cmdargs): """Get the post-production geometries.""" geometries = dict() for cmdarg in cmdargs: name = cmdarg[0]["name"] geometries[name] = dict( kind = "lonlat", info = "Shom Hycom3d postproduction area", area = name, ) return geometries