Source code for GRID_LRT.application.submit

#!/usr/bin/env python

"""This module creates the launch scripts for the WMS on gina
Options include the number of cores and the queue type."""

import os
import subprocess
import logging
import warnings
import random, string

import tempfile
from GRID_LRT.auth.get_picas_credentials import picas_cred as pc
import GRID_LRT
from GRID_LRT.auth import grid_credentials


#class job_launcher(object):
#    """Generic Job Launcher
#    """
#    def __init__(self):
#        pass

[docs]class gridjob(object): """A class containing all required descriptions for a SINGLE Grid job, as well as its parameters"""
[docs] def __init__(self, wholenodes=False, NCPU=1, token_type=None): self.token_type = token_type if wholenodes: self.wholenodes = 'true' else: self.wholenodes = 'false' self.ncpu = NCPU
[docs]class JdlLauncher(object): """jdl_launcher creates a jdl launch file with the appropriate queue, CPUs, cores and Memory per node The jdl file is stored in a temporary location and can be automatically removed using a context manager such as: >>> with Jdl_launcher_object as j: launch_ID=j.launch() This will launch the jdl, remove the temp file and reutrn the Job_ID for the glite job. """
[docs] def __init__(self, numjobs=1, token_type='t_test', wholenodes=False, parameter_step=4, **kwargs): """The jdl_launcher class is initialized with the number of jobs, the name of the PiCaS token type to run, and a flag to use the whole node. Args: numjobs (int): The number of jobs to launch on the cluster token_type (str): The name of the token_type to launch from the PiCaS database. this uses the get_picas_credentials module to get the PiCaS database name, uname, passw wholenodes(Boolean): Whether to reserve the entire node. Default is F NCPU (int, optional): Number of CPUs to use for each job. Default is 1 """ self.authorized = False if 'authorize' in kwargs.keys() and kwargs['authorize'] == False: warnings.warn("Skipping Grid Autorization") else: self.__check_authorized() if numjobs < 1: logging.warn("jdl_file with zero jobs!") numjobs = 1 self.numjobs = numjobs self.parameter_step = parameter_step self.token_type = token_type if wholenodes: self.wholenodes = 'true' else: self.wholenodes = 'false' if "NCPU" in kwargs: self.ncpu = kwargs["NCPU"] else: self.ncpu = 1 if "queue" in kwargs: self.queue = kwargs['queue'] else: self.queue = "mediummc" self.temp_file = None self.launch_file = str("/".join((GRID_LRT.__file__.split("/")[:-1])) + "/data/launchers/run_remote_sandbox.sh")
def __check_authorized(self): grid_credentials.grid_credentials_enabled() self.authorized = True def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): os.remove(self.temp_file.name)
[docs] def build_jdl_file(self, database=None): """Uses a template to build the jdl file and place it in a temporary file object stored internally. """ creds = pc() # Get credentials, get launch file to send to workers if not database: database = creds.database if not os.path.exists(self.launch_file): raise IOError("Launch file doesn't exist! "+self.launch_file) jdlfile = """[ JobType="Parametric"; ParameterStart=0; ParameterStep=%d; Parameters= %d ; Executable = "/bin/sh"; Arguments = "run_remote_sandbox.sh %s %s %s %s "; Stdoutput = "parametricjob.out"; StdError = "parametricjob.err"; InputSandbox = {"%s"}; OutputSandbox = {"parametricjob.out", "parametricjob.err"}; DataAccessProtocol = {"gsiftp"}; ShallowRetryCount = 0; Requirements=(RegExp("%s",other.GlueCEUniqueID)); WholeNodes = %s ; SmpGranularity = %d; CPUNumber = %d; ]""" % (int(self.parameter_step), int(self.numjobs), str(database), str(creds.user), str(creds.password), str(self.token_type), str(self.launch_file), str(self.queue), str(self.wholenodes), int(self.ncpu), int(self.ncpu)) return jdlfile
[docs] def make_temp_jdlfile(self, database=None): """ Makes a temporary file to store the JDL document that is only visible to the user""" self.temp_file = tempfile.NamedTemporaryFile(delete=False) with open(self.temp_file.name, 'w') as t_file_obj: for i in self.build_jdl_file(database): t_file_obj.write(i) return self.temp_file
[docs] def launch(self, database=None): """Launch the glite-job and return the job identification""" if not self.authorized: self._check_authorized() if not self.temp_file: self.temp_file = self.make_temp_jdlfile(database = database) sub = subprocess.Popen(['glite-wms-job-submit', '-d', os.environ["USER"], self.temp_file.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out = sub.communicate() if out[1] == "": return out[0].split('Your job identifier is:')[1].split()[0] raise RuntimeError("Launching of JDL failed because: "+out[1])
[docs]class UnauthorizedJdlLauncher(JdlLauncher):
[docs] def __init__(self, *args, **kw): super(UnauthorizedJdlLauncher, self).__init__(*args, authorize=False, **kw)
[docs] def launch(self): if not self.temp_file: self.temp_file = self.make_temp_jdlfile() fake_link = 'https://wms2.grid.sara.nl:9000/'+''.join(random.choice(string.ascii_letters + string.digits+"-") for _ in range(22)) warnings.warn("If you were authorized, we would be launching the JDL here. You'd get this in return: {}".format(fake_link)) return fake_link
#class loui_launcher(jdl_launcher): # """ # To make integration tests of an AGLOW step, this job launches it on loui. # """ # # def __init__(self, *args, **kwargs): # pass