# -*- coding: utf-8 -*-
"""
Job management
"""
import logging
import subprocess
from enum import Enum
import datetime
from .env import is_os_cmd_avail
from .io import CacheManager
import json
import secrets
import configparser
ALLOWED_JOB_MANAGER_TYPES = ["basic", "slurm", "pbspro"]
logger = logging.getLogger(__name__)
cm = CacheManager()
[docs]class Job(object):
"""Single job"""
overview_format = dict(
name="65",
jobid="8",
session="60",
queue="10",
realqueue="10",
time="5",
status="10",
submission_date="20",
)
[docs] def __init__(
self,
manager,
name,
args,
queue=None,
jobid=None,
session=None,
submission_date=None,
):
self.manager = manager
self.name = name
self.queue = queue
self.jobid = jobid
self.args = args
self.status = JobStatus.UNKOWN
self.realqueue = None
self.time = None
self.memory=None
self.session = session
self.submission_date = submission_date
def __str__(self):
return "Job(name={}, status={}, jobid={}, session={})".format(
self.name, self.status.name, self.jobid, self.session
)
def __repr__(self):
return "<{}>".format(self)
[docs] def get_status(self):
return self.manager.get_status(self) # or JobStatus.FINISHED
[docs] def is_running(self):
return self.get_status() is JobStatus.RUNNING
[docs] def update_status(self, status=None):
if status is None:
status = self.get_status()
if status is None:
return
if isinstance(status, JobStatus):
self.status = status
else: # dict
self.jobid = status["jobid"]
self.realqueue = status["queue"]
self.status = status["status"]
self.time = status["time"]
[docs] def get_overview(self, update=True):
if update:
self.update_status()
name = self.name
jobid = self.jobid
queue = self.queue
realqueue = self.realqueue
status = self.status.name
session = self.session
submission_date = self.submission_date
if self.time is not None:
hours = self.time.seconds // 3600
minutes = (self.time.seconds - hours * 3600) // 60
time = f"{hours:02}h{minutes:02}"
else:
time = "--h--"
fmt = " ".join(
[
("{" + key + f"!s:{ff}" + "}")
for key, ff in self.overview_format.items()
]
)
return fmt.format(**locals())
[docs]class JobStatus(Enum):
UNKOWN = -1
FINISHED = 0
PENDING = 1
RUNNING = 2
INQUEUE = 3
EXITING = 4
COMPLETING = 5
FAILED = 6
[docs]class BasicJobManager(object):
"""Basic job manager"""
commands = {
"submit": {
"command": "python",
"options": {
"script": "{}",
},
},
}
status_names = {
"F": JobStatus.FINISHED,
}
[docs] def __init__(self):
self.jobs = []
self.session = None
logger.debug("Started job manager: " + self.__class__.__name__)
[docs] @staticmethod
def from_type(jmtype):
jmtype = jmtype.lower()
assert jmtype in ALLOWED_JOB_MANAGER_TYPES
cls_name = jmtype.title() + "JobManager"
from . import job
return getattr(job, cls_name)()
@classmethod
def _get_command_args_(cls, command, **kwargs):
args = []
if "command" in cls.commands[command]:
args.append(cls.commands[command]["command"])
if "options" in cls.commands[command]:
for oname, ovalue in kwargs.items():
if oname in cls.commands[command]["options"].keys():
if not type(ovalue) == type(None):
fmt = cls.commands[command]["options"][oname]
if isinstance(ovalue, list):
ovalue = [val for val in ovalue if val]
if not isinstance(
cls.commands[command]["options"][oname], tuple
):
sep = ","
else:
fmt, sep = fmt
for val in ovalue:
args.append(fmt.format(val))
else:
fmt = fmt.format(ovalue).split(" ")
args += fmt
return args
@classmethod
def _get_opts_(cls, command, opts_in):
sep = ","
opts = {}
if cls.commands[command]['options']:
for context in cls.commands[command]['options'].keys():
try:
value = opts_in[context]
if sep in value:
value = value.split(sep)
except:
continue
else:
opts[context] = value
return opts
def _parse_submit_res_(self, res, jobargs, session):
if res.stderr:
logger.debug(
"Job submit stderr: "
+ res.stderr.decode("utf-8", errors="ignore")
)
if res.stdout:
logger.debug(
"Job submit stdout: "
+ res.stdout.decode("utf-8", errors="ignore")
)
job = Job(
manager=self,
name=jobargs["name"],
queue=jobargs["queue"],
args=res.args,
jobid=None,
session=session,
submission_date=str(datetime.datetime.now())[:-7],
)
return job
def _get_session_id_(self):
return secrets.token_hex(8)
[docs] def to_json(self, job):
session_dir = cm.get_session_dir(self.session)
jobdict = self.to_dict(job)
with open(
session_dir + "/" + jobdict["name"] + ".json", "w"
) as outfile:
json.dump(jobdict, outfile, indent=4)
[docs] def from_json(self, json_files):
name_jobs = [job.name for job in self.jobs]
for json_file in json_files:
with open(json_file) as jsonf:
content = json.load(jsonf)
job = Job(
manager=self,
name=content["name"],
args=content["args"],
jobid=content["jobid"],
queue=content["queue"],
session=content["session"],
submission_date=content["submission_date"],
)
if job.name not in name_jobs:
self.jobs.append(job)
return self.jobs
[docs] def to_dict(self, job):
dict_job = dict()
for (key, value) in job.__dict__.items():
if isinstance(value, str):
if value != "":
dict_job[key] = value
elif key == 'manager':
dict_job[key] = value.__class__.__name__
elif key == "status":
dict_job[key] = str(value.name)
elif key == "time":
if value is not None:
hours = value.seconds // 3600
minutes = (value.seconds - hours * 3600) // 60
dict_job[key] = f"{hours:02}h{minutes:02}"
else:
dict_job[key] = "--h--"
else:
dict_job[key] = value
return dict_job
def _parse_status_res_(self, res):
if res.stderr:
logger.debug(
"Job status stderr: "
+ res.stderr.decode("utf-8", errors="ignore")
)
if res.stdout:
logger.debug(
"Job status stdout: "
+ res.stdout.decode("utf-8", errors="ignore")
)
return res.stdout.decode("utf-8", errors="ignore")
[docs] def get_jobids(self, name=None, queue=None):
if isinstance(name, Job):
assert name in self.jobs
jobs = [name]
else:
jobs = self.get_jobs(name=name, queue=queue)
jobids = [job.jobid for job in jobs if job.jobid is not None]
return jobs, jobids
[docs] def update_status(self, name=None, queue=None, jobids=None):
if self.__class__.__name__ != "BasicJobManager" :
if jobids is None:
jobs, jobids = self.get_jobids(name=name, queue=queue)
args = self._extra_status_args_(self._get_command_args_(
"status", jobid=self._jobid_sep_().join(jobids)
))
if args:
logger.debug("Get status: " + " ".join(args))
res = subprocess.run(args, capture_output=True, check=True)
logger.debug("Got status")
status_list = self._parse_status_res_(res)
if status_list:
for status in status_list:
self.get_job(status["jobid"]).update_status(status)
else:
[job.update_status(JobStatus.FINISHED) for job in self.jobs]
[docs] def get_status(self, name=None, queue=None):
jobs = self.update_status(name=name, queue=queue)
if jobs:
if not isinstance(jobs, list):
return jobs.status
return [job.status for job in jobs]
[docs] def get_job(self, jobid):
if jobid is None:
return
for job in self.jobs:
if job.jobid == jobid:
return job
[docs] def get_jobs(self, name=None, queue=None):
jobs = []
if name:
if len(name)>1:
for job in self.jobs:
if job.name in name:
jobs.append(job)
else:
for job in self.jobs:
if (name is not None and job.name != name) or (
queue is not None and job.queue != queue
):
continue
jobs.append(job)
return jobs
def _get_jobs_session_(self, session=None):
self.session = cm.find_session(session)
json_files = cm.get_files(self.session, 'json')
self.from_json(json_files)
def __getitem__(self, name):
return self.get_jobs(name=name)
[docs] def get_overview(self,jobids=None):
self.update_status(jobids=jobids)
header = Job.get_overview_header()
overviews = [job.get_overview(update=True) for job in self.jobs]
return header + "\n" + "\n".join(overviews)
def __str__(self):
return self.get_overview()
[docs] def submit(self, opts0):
self.session = opts0["session"]
script = f"{opts0['job']}"
opts = self._get_opts_('submit', opts0)
if "queue" not in opts.keys():
opts.update({"queue":None})
jobargs = {}
if "depend" in opts.keys():
opts["depend"] = ":".join(opts["depend"])
if self.__class__.__name__ != "BasicJobManager" :
jobargs.update(opts)
jobargs.update(dict(script=script))
else:
jobargs.update(dict(script=script))
jobargs.update(opts)
args = self._get_command_args_("submit", **jobargs)
logger.debug("Submit: " + " ".join(args))
if self.__class__.__name__ != "BasicJobManager" :
res = subprocess.run(args, capture_output=True, check=True)
logger.debug("Submitted")
job = self._parse_submit_res_(res, jobargs, self.session)
self.jobs.append(job)
self.to_json(job)
self.check_status(session=self.session, show=False)
return job.jobid
else:
with open(opts0["log_out"], 'w') as f:
res = subprocess.run(
args,
stdout=f,
stderr=subprocess.STDOUT,
check=True,
)
[docs] def check_status(self, session, show=True):
self._get_jobs_session_(session=session)
self.get_overview(jobids=[job.jobid for job in self.jobs])
[self.to_json(job) for job in self.jobs]
if show == True:
print(self)
[docs] def delete(self, session=None):
self.check_status(session=session)
cond = input(
"Do you really want to delete the jobs listed hereabove ?(yes/no)"
)
if cond == "yes":
args = self._get_command_args_(
"delete",force="-W force",jobid=self._jobid_sep_().join([job.jobid for job in self.jobs])
)
print(args)
subprocess.run(args, capture_output=True, check=True)
[docs] def delete_force(self, session=None):
self.check_status(session=session)
args = self._get_command_args_(
"delete", jobid=self._jobid_sep_().join([job.jobid for job in self.jobs])
)
subprocess.run(args, capture_output=True, check=True)
[docs]class PbsJobManager(BasicJobManager):
"""Pbs Job Manager"""
commands = {
"submit": {
"command": "qsub",
"options": {
"script": "{}",
"name": "-N {}",
"queue": "-V -q {}",
"time": "-l walltime={}",
"memory":"-l mem={}",
"log_out": "-koed -o {}",
"depend": ("-W depend=afterok:{}"),
"mail": "-M {}",
},
},
"status": {
"command": "qstat",
"options": {"jobid": "{}", "logname": "-u $LOGNAME",},
},
"delete": {"command": "qdel", "options": {"force":"-W force","jobid": "{}",}},
}
status_names = {
"R": JobStatus.RUNNING,
"F": JobStatus.FINISHED,
"E": JobStatus.EXITING,
"Q": JobStatus.INQUEUE,
"H": JobStatus.PENDING,
}
[docs] def submit(self, opts):
jobid = BasicJobManager.submit(self, opts)
return jobid
def _parse_submit_res_(self, res, jobargs, session):
job = BasicJobManager._parse_submit_res_(self, res, jobargs, session)
job.jobid = res.stdout.decode("utf-8", errors="ignore").split(".")[0]
return job
def _jobid_sep_(self):
return ' '
def _extra_status_args_(self,args):
args.append('-x')
args.append('-u $LOGNAME')
return args
def _parse_status_res_(self, res):
"""JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)"""
res = BasicJobManager._parse_status_res_(self, res)
lines = res.splitlines()[5:]
out = []
for line in lines:
(
jobid,
user,
queue,
name,
session,
nodes,
task,
mem,
time,
status,
elaptime,
) = line.split()
if (elaptime == "--:--") or elaptime == "--":
elaptime = None
else:
hms = elaptime.split(":")
hh = 0
mm = 0
ss = 0
if len(hms) == 1:
ss = hms[0]
elif len(hms) == 2:
hh, mm = hms
else:
hh, mm, ss = hms
elaptime = datetime.timedelta(
seconds=int(ss), minutes=int(mm), hours=int(hh)
)
if status in self.status_names:
status = self.status_names[status]
else:
status = JobStatus.UNKOWN
out.append(
{
"jobid": jobid.split(".")[0],
"queue": queue,
"name": name,
"time": elaptime,
"status": status,
}
)
return out
[docs]class SlurmJobManager(BasicJobManager):
"""Slurm Job Manager"""
commands = {
"submit": {
"command": "sbatch",
"options": {
"name": "--exclusive -J {}",
"queue": "-p {}",
"nnodes": "-N {}",
"ncpus": "-c {}",
"mem": "--mem={}",
"time": "--time={}",
"depend": "--dependency=afterok:{}",
"log_out": "-o {}",
"script": "{}",
"mail": "--mail-type=ALL --mail-user={}",
},
},
"status": {
"command": "sacct",
"options": {
"jobid": "--jobs={} -o jobid,partition,jobname%60,user,state,elapsed,nnodes,nodelist%40",
},
},
"delete": {"command": "scancel", "options": {"jobid": "{}",}},
}
status_names = {
"R": JobStatus.RUNNING,
"CD": JobStatus.FINISHED,
"PD": JobStatus.PENDING,
"CG": JobStatus.COMPLETING,
"RUNNING": JobStatus.RUNNING,
"FAILED": JobStatus.FAILED,
"PENDING": JobStatus.PENDING,
"TIMEOUT": JobStatus.FAILED,
"COMPLETED": JobStatus.FINISHED
}
[docs] def submit(self, opts):
jobid = BasicJobManager.submit(self, opts)
return jobid
def _extra_status_args_(self,args):
args.append('--noheader')
return args
def _jobid_sep_(self):
return ','
def _parse_submit_res_(self, res, jobargs, session):
job = BasicJobManager._parse_submit_res_(self, res, jobargs, session)
job.jobid = res.stdout.decode("utf-8", errors="ignore").split()[-1]
return job
def _parse_status_res_(self, res):
"""JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)"""
res=BasicJobManager._parse_status_res_(self, res)
out = []
lines = res.splitlines()
if lines:
for line in [lines[0]]:
info = line.split()
jobid = info[0]
queue = info[1]
name = info[2]
user = info[3]
status = info[4]
time = info[5]
nodes = info[6]
nodelist = " ".join(info[7:])
hms = time.split(":")
hh = 0
mm = 0
if len(hms) == 1:
ss = hms[0]
elif len(hms) == 2:
mm, ss = hms
else:
hh, mm, ss = hms
time = datetime.timedelta(seconds=int(ss), minutes=int(mm), hours=int(hh))
if status in self.status_names:
status = self.status_names[status]
else:
status = JobStatus.UNKOWN
out.append(
{"jobid": jobid, "queue": queue, "name": name, "time": time, "status": status}
)
return out
if( is_os_cmd_avail("squeue") ):
jobmanager = SlurmJobManager()
elif( is_os_cmd_avail("qsub") ):
jobmanager = PbsJobManager()
else:
jobmanager = BasicJobManager()
"""
if __name__ == '__main__':
jm = BasicJobManager()
#job = Job(jm, name='toto', args='mkjob.py rundate=10', queue='seq',jobid='007')
job = Job(jm, name='fake_task', args='fake_task.py rundate=10', queue='None')
jm.jobs.append(job)
print(job.get_status())
#print(job)
print(Job.get_overview_header())
#print(job.get_overview())
#print(jm)
"""