# -*- coding: utf-8 -*-
"""
Input/output
"""
import os
import glob
import tarfile
import logging
import subprocess
import configobj
from argparse import (
ArgumentDefaultsHelpFormatter,
RawDescriptionHelpFormatter,
)
from appdirs import AppDirs
from configobj import ConfigObj
import xarray as xr
from xoa.coords import get_time
from xoa.cfgm import ConfigManager
from .__init__ import SloopError
from .env import SloopEnvError
from .__init__ import SLOOP_DATA_DIR
LOGGER = logging.getLogger(__name__)
[docs]class CacheManager(object):
[docs] def __init__(self):
self.logger = logging.getLogger(__name__)
def _get_cache_dir_(self):
try:
return os.path.join(os.environ['MTOOLDIR'], 'cache',
'sloop', 'sessions')
except SloopEnvError as e:
self.logger.info(e)
else:
return AppDirs(os.path.join("sloop", "sessions"),
"$LOGNAME").user_cache_dir
[docs] def get_session_dir(self, session):
if not os.path.isdir(session):
session_dir = os.path.join(self._get_cache_dir_(), session)
self.logger.debug(session_dir)
if not os.path.isdir(session_dir):
os.makedirs(session_dir)
return session_dir
else:
return session
[docs] def get_files(self, session, file_format):
return glob.glob(os.path.join(self._get_cache_dir_(), session,
"*." + file_format))
def _get_session_list_(self, session):
if session:
return glob.glob(os.path.join(self._get_cache_dir_(),
session+"*"))
else:
return glob.glob(os.path.join(self._get_cache_dir_(), "*"))
[docs] def find_session(self, session=None):
sess_list = self._get_session_list_(session)
if len(sess_list) > 1:
print("Choose one session:" + "\n")
[print(sess.split('/')[-1] + "\n") for sess in sess_list]
session_dir = input()
elif len(sess_list) == 1:
session_dir = sess_list[0].split('/')[-1]
else:
session_dir = None
return session_dir
[docs] def clear(self, session=None):
session = self._get_session_list_(session)
if session:
[print(dr + "\n") for dr in session]
cond = input("Do you really want to erase the session"
"s caches listed hereabove ?(yes/no)")
while not (cond == 'yes' or cond == 'no'):
cond = input("Do you really want to erase the session"
"s caches listed hereabove ?(yes/no)")
if cond == "yes":
[
subprocess.run(
["rm", "-rf", dr], capture_output=True, check=True,
)
for dr in session
]
else:
self.logger.info("The sloop cache is empty")
[docs]def tarpat(outfile, directory, patterns=['*']):
"""Create a .tar file given a list of file patterns
Parameters
----------
outfile: str
File name of the output .tar file
directory: str
Files are considered relative this directory
patterns: list of str
List of glob patterns relative to `directory`
Return
------
list
List of files that were added to the archive as absolute paths
"""
if isinstance(patterns, str):
patterns = [patterns]
effective = []
with tarfile.open(outfile, "w") as tar:
for pattern in patterns:
paths = glob.glob(os.path.join(directory, pattern))
for path in paths:
relpath = os.path.relpath(path, directory)
tar.add(path, relpath)
effective.append(os.path.abspath(path))
return effective
[docs]def xr_open_concat(ncfiles):
"""Helper to concatenate several netcdf file to a single dataset
This basically a call to :func:`xarray.open_dataset` and
to :func:`xarray.concat`.
Parameters
----------
ncfiles: str, list
Single or list of netcdf files.
In the case of a string (single file), no concatenation a perform.
Return
------
xarray.Dataset
"""
if isinstance(ncfiles, str):
return xr.open_dataset(ncfiles)#, chunks={"time": 24})
#dss = [xr.open_dataset(ncfile, chunks={"time": 24}) for ncfile in ncfiles]
dss = [xr.open_dataset(ncfile) for ncfile in ncfiles]
dsout = xr.concat(
dss, "time", compat="no_conflicts", coords="different",
data_vars="all"
)
dsout = dsout.sortby(dsout["time"])
for vname, var in dsout.items():
dsout[vname].encoding.update(dss[0][vname].encoding)
dsout[vname].attrs.update(dss[0][vname].attrs)
for coord in dsout.coords:
dsout[coord].encoding.update(dss[0][coord].encoding)
dsout[coord].attrs.update(dss[0][coord].attrs)
return dsout
[docs]def nc_get_time(ncfile):
"""Simply return the time axis found in a netcdf file
Parameters
----------
ncfile
Return
------
date
"""
ds = xr.open_dataset(ncfile)
time = get_time(ds)
ds.close()
return time.dt.floor("D")
[docs]def read_inicfg_files(fini=None, fcfg=None):
"""Read and validate a configobj-like config file"""
results = ConfigManager(fini).load(fcfg)
if not results:
raise SloopError(
"PLEASE, CHECK [fcfg]"
)
return results
[docs]def write_fini(dico, fname):
"""Write specification file from dict"""
config = ConfigObj(dico)
config.filename = fname
config.write()
[docs]def concat_ascii_files(fin, fout):
"""Concatenation of ascii files"""
LOGGER.info(f"concatenate ascii files : {fin} + {fout}")
if not isinstance(fin, list):
fin = [fin]
with open(fout, 'w') as outfile:
for infile in fin:
with open(infile) as inf:
outfile.write(inf.read())
_data_config = {}
[docs]def read_config(model):
"""
Récupération du fichier de configuration associé au type de données model
Renvoie un dictionnaire python
"""
global _data_config
if not model in _data_config:
rep = SLOOP_DATA_DIR
cfgfile = rep + '/config_%s.cfg'%(model)
try:
_data_config[model] = configobj.ConfigObj(cfgfile,unrepr=True,interpolation=True)['config']
except:
raise Exception('Error occured when loading config file %s' %(cfgfile))
return _data_config[model]