Source code for sloop.io

# -*- coding: utf-8 -*-
"""
Input/output
"""
import os
import glob
import tarfile
import logging
import subprocess
import configobj
from argparse import (
    ArgumentDefaultsHelpFormatter,
    RawDescriptionHelpFormatter,
)


from appdirs import AppDirs
from configobj import ConfigObj

import xarray as xr
from xoa.coords import get_time
from xoa.cfgm import ConfigManager

from .__init__ import SloopError
from .env import SloopEnvError

from .__init__ import SLOOP_DATA_DIR

LOGGER = logging.getLogger(__name__)


[docs]class CacheManager(object):
[docs] def __init__(self): self.logger = logging.getLogger(__name__)
def _get_cache_dir_(self): try: return os.path.join(os.environ['MTOOLDIR'], 'cache', 'sloop', 'sessions') except SloopEnvError as e: self.logger.info(e) else: return AppDirs(os.path.join("sloop", "sessions"), "$LOGNAME").user_cache_dir
[docs] def get_session_dir(self, session): if not os.path.isdir(session): session_dir = os.path.join(self._get_cache_dir_(), session) self.logger.debug(session_dir) if not os.path.isdir(session_dir): os.makedirs(session_dir) return session_dir else: return session
[docs] def get_files(self, session, file_format): return glob.glob(os.path.join(self._get_cache_dir_(), session, "*." + file_format))
def _get_session_list_(self, session): if session: return glob.glob(os.path.join(self._get_cache_dir_(), session+"*")) else: return glob.glob(os.path.join(self._get_cache_dir_(), "*"))
[docs] def find_session(self, session=None): sess_list = self._get_session_list_(session) if len(sess_list) > 1: print("Choose one session:" + "\n") [print(sess.split('/')[-1] + "\n") for sess in sess_list] session_dir = input() elif len(sess_list) == 1: session_dir = sess_list[0].split('/')[-1] else: session_dir = None return session_dir
[docs] def clear(self, session=None): session = self._get_session_list_(session) if session: [print(dr + "\n") for dr in session] cond = input("Do you really want to erase the session" "s caches listed hereabove ?(yes/no)") while not (cond == 'yes' or cond == 'no'): cond = input("Do you really want to erase the session" "s caches listed hereabove ?(yes/no)") if cond == "yes": [ subprocess.run( ["rm", "-rf", dr], capture_output=True, check=True, ) for dr in session ] else: self.logger.info("The sloop cache is empty")
[docs]def tarpat(outfile, directory, patterns=['*']): """Create a .tar file given a list of file patterns Parameters ---------- outfile: str File name of the output .tar file directory: str Files are considered relative this directory patterns: list of str List of glob patterns relative to `directory` Return ------ list List of files that were added to the archive as absolute paths """ if isinstance(patterns, str): patterns = [patterns] effective = [] with tarfile.open(outfile, "w") as tar: for pattern in patterns: paths = glob.glob(os.path.join(directory, pattern)) for path in paths: relpath = os.path.relpath(path, directory) tar.add(path, relpath) effective.append(os.path.abspath(path)) return effective
[docs]def xr_open_concat(ncfiles): """Helper to concatenate several netcdf file to a single dataset This basically a call to :func:`xarray.open_dataset` and to :func:`xarray.concat`. Parameters ---------- ncfiles: str, list Single or list of netcdf files. In the case of a string (single file), no concatenation a perform. Return ------ xarray.Dataset """ if isinstance(ncfiles, str): return xr.open_dataset(ncfiles)#, chunks={"time": 24}) #dss = [xr.open_dataset(ncfile, chunks={"time": 24}) for ncfile in ncfiles] dss = [xr.open_dataset(ncfile) for ncfile in ncfiles] dsout = xr.concat( dss, "time", compat="no_conflicts", coords="different", data_vars="all" ) dsout = dsout.sortby(dsout["time"]) for vname, var in dsout.items(): dsout[vname].encoding.update(dss[0][vname].encoding) dsout[vname].attrs.update(dss[0][vname].attrs) for coord in dsout.coords: dsout[coord].encoding.update(dss[0][coord].encoding) dsout[coord].attrs.update(dss[0][coord].attrs) return dsout
[docs]def nc_get_time(ncfile): """Simply return the time axis found in a netcdf file Parameters ---------- ncfile Return ------ date """ ds = xr.open_dataset(ncfile) time = get_time(ds) ds.close() return time.dt.floor("D")
[docs]class SloopArgumentHelpFormatter(RawDescriptionHelpFormatter, ArgumentDefaultsHelpFormatter): pass
[docs]def read_inicfg_files(fini=None, fcfg=None): """Read and validate a configobj-like config file""" results = ConfigManager(fini).load(fcfg) if not results: raise SloopError( "PLEASE, CHECK [fcfg]" ) return results
[docs]def write_fini(dico, fname): """Write specification file from dict""" config = ConfigObj(dico) config.filename = fname config.write()
[docs]def concat_ascii_files(fin, fout): """Concatenation of ascii files""" LOGGER.info(f"concatenate ascii files : {fin} + {fout}") if not isinstance(fin, list): fin = [fin] with open(fout, 'w') as outfile: for infile in fin: with open(infile) as inf: outfile.write(inf.read())
_data_config = {}
[docs]def read_config(model): """ Récupération du fichier de configuration associé au type de données model Renvoie un dictionnaire python """ global _data_config if not model in _data_config: rep = SLOOP_DATA_DIR cfgfile = rep + '/config_%s.cfg'%(model) try: _data_config[model] = configobj.ConfigObj(cfgfile,unrepr=True,interpolation=True)['config'] except: raise Exception('Error occured when loading config file %s' %(cfgfile)) return _data_config[model]