"""
Atmospheric Forcing preprocessings
"""
import logging
import numpy as np
from ...log import LoggingLevelContext
import pandas as pd
import glob
with LoggingLevelContext(logging.WARNING):
import xarray as xr
from xoa.filter import erode_mask
from ...times import convert_to_julian_day
from ...interp import interp_time, Regridder
from ...phys import (
windstress,
radiativeflux,
celsius2kelvin,
watervapormixingratio,
)
from ...filters import erode_mask_vec
from .__init__ import CROCO_PARAM
from ...io import read_config
LOGGER = logging.getLogger(__name__)
[docs]def decumul_flux(ds_cumul, model):
"""Decumul Atm fluxes"""
LOGGER.info("Decumul flux files")
# read config file
mod = read_config(model)
# get the data frequency (cumulated period)
step = mod["freq"]
# compute time diff in seconds
diff_time = pd.Timedelta(step).total_seconds()
# decumul data
ds_decumul = ds_cumul / diff_time
# shift time to match the average period of decumulated data
ds_decumul["time"] = ds_decumul.time - 0.5 * pd.Timedelta(step)
# now interp data on the original time axis
ds_decumul = ds_decumul.interp(
time=ds_cumul.time, kwargs={"fill_value": "extrapolate"}
)
return ds_decumul
[docs]def concat_met_files(ds_inst, ds_cumul):
"""Combine meteo files in one Dataset"""
LOGGER.info("Concat inst and flux files")
ds_atmfrc = xr.merge(
[ds_cumul, ds_inst], combine_attrs="override", compat="no_conflicts"
)
return ds_atmfrc
[docs]def rename_vars(ds):
"""Rename meteo variable to match Croco model"""
LOGGER.info("Rename atmfrc variables as expected by Croco")
for var in list(ds.keys()):
# test if the variable is already an hyperparam => no rename , just add attrs
if var in list(CROCO_PARAM.keys()):
ds[var].attrs = CROCO_PARAM[var]["attrs"]
else:
# need to browse every hyperparam's names list
for hyparam in list(CROCO_PARAM.keys()):
if var in CROCO_PARAM[hyparam]["name"]:
ds = ds.rename({var: hyparam})
ds[hyparam].attrs = CROCO_PARAM[hyparam]["attrs"]
break
# remove unused variables
for dsi in list(ds.keys()):
if dsi not in list(CROCO_PARAM.keys()):
ds = ds.drop(dsi)
return ds
[docs]def build_meteo_list(date1, date2, model):
"""Build the list of meteo files from given dates"""
LOGGER.info("Get list of meteo files")
# retrieve params of meteo model
mod = read_config(model)
rac = mod["prefix"]
data_dir = mod["data_dir"]
suff_inst = mod["suff_inst"]
suff_flux = mod["suff_flux"]
# build list of date
dates_list = pd.date_range(start=date1, end=date2, freq=mod["freq"])
# build list of names
inst_list = [
f"{rac}{date.strftime('%Y%m%dT%HZ')}{suff_inst}" for date in dates_list
]
flux_list = [
f"{rac}{date.strftime('%Y%m%dT%HZ')}{suff_flux}" for date in dates_list
]
# finally build list of files
inst_list = [glob.glob(f"{data_dir}/**/{file}")[0] for file in inst_list]
flux_list = [glob.glob(f"{data_dir}/**/{file}")[0] for file in flux_list]
return inst_list, flux_list
[docs]def read_atm_data(inst_list, cumul_list, lon_bnds, lat_bnds):
"""Read list of meteo files and reduce it to the area of interest"""
from functools import partial
def _preprocess(x, lon_bnds, lat_bnds):
return x.sel(longitude=slice(*lon_bnds), latitude=slice(*lat_bnds))
partial_func = partial(_preprocess, lon_bnds=lon_bnds, lat_bnds=lat_bnds)
ds_inst = xr.open_mfdataset(
inst_list, concat_dim="time", combine="nested", preprocess=partial_func
)
ds_cumul = xr.open_mfdataset(
cumul_list, concat_dim="time", combine="nested", preprocess=partial_func
)
ds_inst = ds_inst.squeeze(dim="height")
ds_cumul = ds_cumul.squeeze(dim="height")
return ds_inst, ds_cumul