diff --git a/setup.py b/setup.py index 78771055efcd7872a3e516a0f97afc8b92570265..f71d95373de26fa827eb55e4bd75edd1ea57b664 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ URL = "https://gitlab.gwdg.de/portal.mparcia/celery-docker-test" EMAIL = "marcel.parciak@gmail.com" AUTHOR = "Marcel Parciak" REQUIRES_PYTHON = ">=3.9.0" -VERSION = "1.2.0" +VERSION = "1.2.1" # What packages are required for this module to be executed? REQUIRED = ["celery", "minio", "docker", "redis", "sqlalchemy", "psycopg2-binary"] diff --git a/tosagent/task.py b/tosagent/task.py index d6df7c223314a03bb9e442f27758bc6fcc8e6531..1c0d6deca56b85527f47dd5e81965ff673a8ea3a 100644 --- a/tosagent/task.py +++ b/tosagent/task.py @@ -18,6 +18,9 @@ cache_directory = "/tmp/tos_cached" @app.task(bind=True) def start_job(self, name: str, version: str, context: Dict[str, Any], debug: bool): + """This method takes a name and a version of a TOS Job, downloads it from minio and executes it in a Docker Container.""" + + # create both clients, for docker and for minio docker_client = docker.DockerClient(base_url="unix://var/run/docker.sock") minio_client = minio.Minio( os.getenv("JOBSTORE_URI"), @@ -26,24 +29,30 @@ def start_job(self, name: str, version: str, context: Dict[str, Any], debug: boo secure=False, ) + # create some directories for the task os.makedirs(download_directory, exist_ok=True) os.makedirs(cache_directory, exist_ok=True) os.makedirs(os.path.join(cache_directory, "io"), exist_ok=True) + # define the cache (i.e. extracted TOS Job zip) path cache_path = os.path.join(cache_directory, f"{name}_{version.replace('.', '-')}") zip_name = f"{name}_{version}.zip" + # if the cache is already downloaded and extracted, skip this part if not os.path.exists(cache_path): + # retrieve TOS Job zip from minio with minio_client.get_object("tos", f"jobs/{zip_name}") as http_response: with open(os.path.join(download_directory, zip_name), "wb+") as write_zip: for chunk in http_response.stream(): write_zip.write(chunk) + # unzip the downloaded file with zipfile.ZipFile( os.path.join(download_directory, zip_name), "r" ) as read_zip: read_zip.extractall(cache_path) try: + # check the extracted zip file: does it contain the Job at the given version? with open(os.path.join(cache_path, "jobInfo.properties"), "r") as propfile: for line in propfile: if line.startswith("job="): @@ -52,63 +61,82 @@ def start_job(self, name: str, version: str, context: Dict[str, Any], debug: boo f"The Job ZIP and Job Name do not match. Got: {line[len('job='):-1]}, need: {name}" ) if line.startswith("jobVersion="): - print(line) if not line[len("jobVersion=") : -1] == version: raise RuntimeError( f"The Job ZIP and Job Version do not match. Get: {line[len('jobVersion='):-1]}, need: {version}" ) except RuntimeError as re: + # a RuntimeError indicates that the wrong zip has been downloaded os.unlink(cache_path) raise re finally: + # in any case: remove the downloaded zip file os.unlink(os.path.join(download_directory, zip_name)) + # search the _run.sh file to execute the TOS job for element in os.listdir(os.path.join(cache_path, name)): if element.endswith("_run.sh"): relative_sh_path = os.path.join(name, element) - # make readable and executable + # make the execution script readable and executable os.chmod(os.path.join(cache_path, relative_sh_path), stat.S_IXUSR | stat.S_IRUSR) - # pull the image + # pull openjdk:8 image (to run TOS Job) docker_client.images.pull("openjdk", tag="8") + # define input and output paths for this task and the active workflow message file input_path = os.path.join(cache_directory, "io", "input", f"{self.request.id}") output_path = os.path.join(cache_directory, "io", "output", f"{self.request.id}") awmsg = "".join(random.choices(string.ascii_lowercase, k=12)) + ".json" + # define a context (I am too lazy to retrieve it from the CLI) context = { "rows": 12, "storeDirectory": os.path.join("/output", "something.csv"), "aw_message_filepath": os.path.join("/output", awmsg), } + # transform the context dict to a context_params list context_params = [f"--context_param={k}={v}" for k, v in context.items()] + # run the containerized TOS Job container = docker_client.containers.run( "openjdk:8", [os.path.join("/job", relative_sh_path)] + context_params, - detach=False, - remove=(not debug), - stdout=True, - stderr=True, + detach=False, # do not detach, wait for the container to finish + remove=(not debug), # remove it, except if debug mode is active + stdout=True, # capture stdout + stderr=True, # capture stderr volumes={ - cache_path: {"bind": "/job", "mode": "ro"}, - input_path: {"bind": "/input", "mode": "rw"}, - output_path: {"bind": "/output", "mode": "rw"}, + cache_path: { + "bind": "/job", + "mode": "ro", + }, # cache path is where the extracted Job is + input_path: { + "bind": "/input", + "mode": "rw", + }, # input path is unused currently, but could contain input files + output_path: { + "bind": "/output", + "mode": "rw", + }, # output path is used for the aw response messsage and any output files }, ) + # if debug is active, print the container logs if debug: print(container) + # check the output path (this is specific to testJob_0.2.zip) with open(os.path.join(output_path, "something.csv")) as out_csv: print(f"Lines written: {sum(1 for line in out_csv)}") + # load the active workflow message with open(os.path.join(output_path, awmsg), "r") as aw_message: aw_json = json.load(aw_message) + # return the logs and the active workflow response return (container.decode("utf-8"), aw_json)