#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import fnmatch
import socket
import os
import importlib
import pathlib
import logging
import re
import subprocess
from .__init__ import SloopError
from .log import LoggingLevelContext
from configparser import ConfigParser
#: Database for guessing HPC target from hostname
HOSTNAME_TO_HPC_TARGET = {
"datarmor": ["*.ice.ifremer.fr"],
"aquarium": ["aqlogin*"],
"meteofrance": ["*hpc.meteo.fr"],
}
RE_CYCLE_MATCH = re.compile(
r"^(?P<scheme>[^:]*):(?P<cfgname>[^@]*)(?:@(?P<location>.+))?$"
).match
[docs]def load_config_from_dict(conf, sec):
"""Load environment variables from dict"""
env_dict = conf[sec]
try:
for var in env_dict.keys():
if var.startswith("env_"):
varname = var.split("env_")[-1]
if isinstance(env_dict[var], list):
value = " ".join(comp[var])
else:
value = env_dict[var]
os.environ[varname.upper()] = value
except SloopEnvError as e:
logger.error(e.msg)
[docs]def get_section_from_config(conf, section):
"""Load section in config.ini file"""
config = ConfigParser(default_section=None)
config.read(conf)
try:
sec = config[section]
except Exception:
raise SloopEnvError(f"Section {section} not in {conf} file")
return dict(sec.items())
[docs]def get_field_from_config(conf, section, key):
"""Load config from a section in config.ini file"""
config = ConfigParser()
config.read(conf)
try:
field = config[section][key]
except Exception:
raise SloopEnvError(f"Section {section} or {key} not in {conf} file")
return field
[docs]def load_env_config(conf, sec):
"""Load config from a section in config.ini file"""
config = ConfigParser()
config.read(conf)
# start filling env variables
if sec in config.sections():
comp = config[sec]
for var in comp.keys():
if var.startswith("env_"):
varname = var.split("env_")[-1]
if isinstance(comp[var], list):
value = " ".join(comp[var])
else:
value = comp[var]
os.environ[varname.upper()] = value
else:
raise SloopEnvError(f"Section {sec} not in config file")
[docs]def stripout_conda_env(env_dict=None):
"""Remove conda env from PATH, LD_LIBRARY_PATH and C_INCLUDE_PATH"""
if env_dict is None:
env_dict = os.environ
for name, subdir in (
("PATH", "bin"),
("LD_LIBRARY_PATH", "lib"),
("LIBRARY_PATH", "lib"),
("INCLUDE_PATH", "include"),
("C_INCLUDE_PATH", "include"),
):
if name in env_dict:
paths = env_dict[name].split(":")
path = os.path.join(sys.prefix, subdir)
if path in paths:
paths.remove(path)
env_dict[name] = ":".join(paths)
[docs]class ContextEnv(object):
"""Context for changing env vars"""
[docs] def __init__(self, stripout_conda=False, **envs):
self.stripout_conda = stripout_conda
self.envs = envs
def __enter__(self):
self.old_env = os.environ.copy()
if self.stripout_conda:
stripout_conda_env()
os.environ.update(**self.envs)
return self
def __exit__(self, exc_type, exc_value, traceback):
os.environ = self.old_env
[docs]def guess_hpc_target():
"""Get the current HPC target name
It first look for the :envvar:`HPC_TARGET` env variable, then use the
:data:`HOSTNAME_TO_HPC_TARGET` variable to guess it from hostname.
If not a well known HPC target, "generic" is returned.
The :envvar:`HPC_TARGET` is then set to this value.
"""
if "HPC_TARGET" in os.environ:
return os.environ["HPC_TARGET"]
hostname = socket.getfqdn()
for hpc_target, patterns in list(HOSTNAME_TO_HPC_TARGET.items()):
for pattern in patterns:
if fnmatch.fnmatch(hostname, pattern):
os.environ["HPC_TARGET"] = hpc_target
return hpc_target
os.environ["HPC_TARGET"] = "generic"
return "generic"
[docs]def is_os_cmd_avail(cmd):
"""Check if a command is available
Parameters
----------
cmd: str
Command can be executed in a terminal
Return
------
Boolean
"""
return subprocess.run(["which", cmd], stdout=subprocess.DEVNULL).returncode == 0
[docs]class SloopEnvError(SloopError):
def __init__(self, msg):
self.msg = msg
[docs]def check_env_vars(env_vars):
"""Check that the given list of env variables are set
Parameters
----------
env_vars: str, list
List of environment variables
Return
------
SloopEnvError: if a variable is not set
"""
if isinstance(env_vars, str):
env_vars = [env_vars]
for env_var in env_vars:
if env_var not in os.environ:
raise SloopEnvError(
f"Environment variable not set: {env_var}. "
+ "The following variales must be set: "
+ " ".join(env_vars)
)
[docs]def check_vortex_dir(vortex_dir):
"""Check that the vortex directory is valid
Return
------
SloopEnvError
"""
for relpaths in [
("",),
("bin", "mkjob.py"),
("src", "common"),
("site", "footprints"),
]:
if not os.path.exists(os.path.join(vortex_dir, *relpaths)):
raise SloopEnvError(f"Invalid vortex dir: {vortex_dir}")
[docs]def check_vapp(vapp):
"""Check that vapp is valid
vapp must be a module importable from :mod:`sloop.models`.
Return
------
SloopEnvError
"""
vapp = str(vapp)
try:
with LoggingLevelContext(logging.WARNING):
vapp_module = importlib.import_module("." + vapp, "sloop.models")
except ImportError:
raise SloopEnvError("Invalid vapp: " + vapp)
return vapp_module
[docs]def check_exp_dir(exp_dir, errors="raise"):
"""Check that an experiment directory is valid
This checks parents and children.
Return
------
dict
See also
--------
check_vapp
"""
assert errors in ("raise", "ignore")
assert "MTOOLDIR" in os.environ, "MTOOLDIR has to be set"
# The exp dir itself
specs = {}
if not os.path.exists(exp_dir):
if errors == "raise":
raise SloopEnvError(f"Experiment directory not found: {exp_dir}")
else:
return specs
else:
exp_dir = pathlib.Path(os.path.abspath(exp_dir))
specs["exp_dir"] = exp_dir
# xpid/vapp/vconf
vconf = exp_dir.name
vapp_dir = exp_dir.parent
vapp = vapp_dir.name
vapp_module = check_vapp(vapp)
xpid = vapp_dir.parent.name
user = os.path.basename(os.environ["HOME"]) # getpass.getuser()
location = user if "@" not in xpid else xpid.split("@")[1]
specs.update(
vapp=vapp,
vconf=vconf,
xpid=xpid,
vapp_module=vapp_module,
location=location,
exp_dir=str(exp_dir),
)
# Links
for tool in ["vortex", "sloop"]:
if not (exp_dir / tool).is_symlink():
if errors == "raise":
raise SloopEnvError(f"Link to tool not found: {tool}")
else:
specs[tool] = "no local link found!"
else:
specs[tool] = os.path.abspath(str(exp_dir / tool))
# Conf
# - dir
conf_dir = exp_dir / "conf"
if not conf_dir.exists():
if errors == "raise":
raise SloopEnvError(f"Conf dir not found: {conf_dir}")
else:
specs["conf_dir"] = "not found!"
else:
specs["conf_dir"] = str(conf_dir)
# - ini file
ini_file = conf_dir / f"{vapp}_{vconf}.ini"
if not ini_file.exists():
if errors == "raise":
raise SloopEnvError(f"Ini file not found: {ini_file}")
else:
specs["ini_file"] = "not found !"
else:
specs["ini_file"] = str(ini_file)
# uenv file
with ContextEnv(VORTEX_QUIET="1"):
from vortex.util.config import GenericConfigParser
cfg = GenericConfigParser(specs["ini_file"])
if cfg.has_option(None, "cycle"):
m = RE_CYCLE_MATCH(cfg.defaults()["cycle"])
if m:
if m.group("scheme").startswith("u"):
from vortex import ticket
t = ticket()
location = m.group("location") or location
specs["uget_hack_file"] = os.path.join(
t.glove.configrc,
"hack",
"uget",
location, # xpid location or user?
"env",
m.group("cfgname"))
# Tasks
# - dir
tasks_dir = exp_dir / "tasks"
if not tasks_dir.exists():
if errors == "raise":
raise SloopEnvError(f"Tasks dir not found: {tasks_dir}")
else:
specs["tasks_dir"] = "not found!"
else:
specs["tasks_dir"] = str(tasks_dir)
# - init file
init_task = tasks_dir / "__init__.py"
if not init_task.exists():
if errors == "raise":
raise SloopEnvError(f"Tasks init file not found: {init_task}")
else:
specs["tasks_init_file"] = "not found!"
else:
specs["tasks_init_file"] = str(init_task)
# - task files
taskfiles = list(tasks_dir.glob("*.py"))
specs["task_files"] = [str(t) for t in taskfiles if not t.name == "__init__.py"]
return specs
[docs]def show_exp_info(exp_dir):
"""Print some info about an experiment
Parameters
----------
exp_dir: str
A valid experiment directory
See also
--------
check_exp_dir
"""
specs = check_exp_dir(exp_dir, errors="ignore")
for label, content in specs.items():
label = label.replace("_", " ")
if isinstance(content, list):
print("- " + label + ":")
for value in content:
print(" - " + value)
else:
if hasattr(content, "__file__"):
content = content.__file__
print("-", label + ":", content)