From 9ba059e0744713a0aef9195e98b02f44e7271d22 Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Fri, 28 Apr 2023 16:53:02 -0700 Subject: [PATCH 01/15] tests: added test_queues(), works on LSF --- tests/test_queue.py | 96 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 tests/test_queue.py diff --git a/tests/test_queue.py b/tests/test_queue.py new file mode 100644 index 00000000..3325c858 --- /dev/null +++ b/tests/test_queue.py @@ -0,0 +1,96 @@ +import os +import random + +import logging +from pathlib import Path + +import pytest + +from psij import Job, JobSpec, JobAttributes, JobExecutor, ResourceSpecV1 +from tempfile import TemporaryDirectory + +from executor_test_params import ExecutorTestParams +from _test_tools import _get_executor_instance, _get_timeout, assert_completed, _make_test_dir + +from pprint import pprint + + +def get_slurm_queues(): + res = os.popen("mdiag -c").read().split("\n") + res = [line.split("=")[-1] for line in res if "PartitionName" in line] + return res + + +def get_lsf_queues(): + res = os.popen("bqueues -u ramon").read().split("\n") + res = [l for l in res if len(l) != 0] + res = [l.split(" ", 1) for l in res] + res = [l[0] for l in res if "Active" in l[1] and len(l) != 0] + return res + + +def get_queue_info(): + res = [] + res.extend(os.popen("bjobs").read().split("\n")) + res.extend(os.popen("bjobs").read().split("\n")) + return res + + +def make_job(queue:str) -> Job: + return Job( + JobSpec( + executable="/bin/date", + # arguments=['-utc', '--debug'], + # resources=ResourceSpecV1(node_count=1), + attributes=JobAttributes( + queue_name=queue, + ), + ) + ) + + +def test_queue(execparams: ExecutorTestParams) -> None: + executor = "" + queues = [] + slurm_queues = get_slurm_queues() + lsf_queues = get_lsf_queues() + + queues.extend(slurm_queues) + queues.extend(lsf_queues) + + print("slurm:", slurm_queues) + print("lsf:", lsf_queues) + print("extended ques:", queues) + + if len(slurm_queues) != 0: + executor = "slurm" + elif len(lsf_queues) != 0: + executor = "lsf" + + if len(queues) <= 1: + return + + test_queues = random.sample(queues, 2) + print("test queues:", test_queues) + + print("Executor = ", executor) + + executor = JobExecutor.get_instance(executor) + + job1 = make_job(test_queues[0]) + executor.submit(job1) + qstat = get_queue_info() + job1_qstat_entry = [l for l in qstat if job1._native_id in l][0] + assert test_queues[0] in job1_qstat_entry + + job2 = make_job(test_queues[1]) + executor.submit(job2) + qstat = get_queue_info() + job2_qstat_entry = [l for l in qstat if job2._native_id in l][0] + assert test_queues[1] in job2_qstat_entry + + job1.wait() + job2.wait() + print("Job1:", job1.status) + print("Job2:", job2.status) + print() From ef484f421aedf0bb0fe43036f7c5999a69869ce4 Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Mon, 1 May 2023 13:56:46 -0700 Subject: [PATCH 02/15] test queue: now using dict to store commands --- tests/test_queue.py | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 3325c858..7614a3a8 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -15,6 +15,20 @@ from pprint import pprint +SCHEDULER_COMMANDS = { + "slurm": { + "get_queues": "mdiag -c", + "get_user_jobs": "squeue -u $(whoami)", + "kill_command": "scancel" + }, + "lsf": { + "get_queues": "bqueues -u $(whoami)", + "get_user_jobs": "bjobs", + "kill_command": "bkill" + } +} + + def get_slurm_queues(): res = os.popen("mdiag -c").read().split("\n") res = [line.split("=")[-1] for line in res if "PartitionName" in line] @@ -29,10 +43,11 @@ def get_lsf_queues(): return res -def get_queue_info(): +def get_queue_info(executor: str): res = [] - res.extend(os.popen("bjobs").read().split("\n")) - res.extend(os.popen("bjobs").read().split("\n")) + print("PASSED THROUGH EXECUTOR:", executor) + command = SCHEDULER_COMMANDS[executor]["get_user_jobs"] + res.extend(os.popen(command).read().split("\n")) return res @@ -50,7 +65,7 @@ def make_job(queue:str) -> Job: def test_queue(execparams: ExecutorTestParams) -> None: - executor = "" + scheduler = "" queues = [] slurm_queues = get_slurm_queues() lsf_queues = get_lsf_queues() @@ -63,9 +78,9 @@ def test_queue(execparams: ExecutorTestParams) -> None: print("extended ques:", queues) if len(slurm_queues) != 0: - executor = "slurm" + scheduler = "slurm" elif len(lsf_queues) != 0: - executor = "lsf" + scheduler = "lsf" if len(queues) <= 1: return @@ -73,22 +88,23 @@ def test_queue(execparams: ExecutorTestParams) -> None: test_queues = random.sample(queues, 2) print("test queues:", test_queues) - print("Executor = ", executor) - - executor = JobExecutor.get_instance(executor) + executor = JobExecutor.get_instance(scheduler) job1 = make_job(test_queues[0]) executor.submit(job1) - qstat = get_queue_info() + qstat = get_queue_info(scheduler) job1_qstat_entry = [l for l in qstat if job1._native_id in l][0] assert test_queues[0] in job1_qstat_entry job2 = make_job(test_queues[1]) executor.submit(job2) - qstat = get_queue_info() + qstat = get_queue_info(scheduler) job2_qstat_entry = [l for l in qstat if job2._native_id in l][0] assert test_queues[1] in job2_qstat_entry + qstat = get_queue_info(scheduler) + print("qstat = ", qstat) + job1.wait() job2.wait() print("Job1:", job1.status) From d8aaf0a9e86428c2c2f761ff2e3512506c2e9a73 Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Mon, 1 May 2023 14:12:33 -0700 Subject: [PATCH 03/15] test queue: added kill_job() --- tests/test_queue.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 7614a3a8..1da82d62 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -51,6 +51,12 @@ def get_queue_info(executor: str): return res +def kill_job(scheduler: str, job: Job): + command = f"{SCHEDULER_COMMANDS[scheduler]['kill_command']} {job._native_id}" + print("Kill command:", command) + os.system(command) + + def make_job(queue:str) -> Job: return Job( JobSpec( @@ -103,10 +109,10 @@ def test_queue(execparams: ExecutorTestParams) -> None: assert test_queues[1] in job2_qstat_entry qstat = get_queue_info(scheduler) - print("qstat = ", qstat) + print("qstat = ", "\n".join(qstat)) - job1.wait() - job2.wait() - print("Job1:", job1.status) - print("Job2:", job2.status) - print() + kill_job(scheduler, job1) + kill_job(scheduler, job2) + + qstat = get_queue_info(scheduler) + print("qstat = ", "\n".join(qstat)) From 7d957999d7239acbd9c7918bfe7522dc5a87689a Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Mon, 1 May 2023 14:13:53 -0700 Subject: [PATCH 04/15] test queues: cleanup --- tests/test_queue.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 1da82d62..8ea196b9 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -12,8 +12,6 @@ from executor_test_params import ExecutorTestParams from _test_tools import _get_executor_instance, _get_timeout, assert_completed, _make_test_dir -from pprint import pprint - SCHEDULER_COMMANDS = { "slurm": { @@ -45,7 +43,6 @@ def get_lsf_queues(): def get_queue_info(executor: str): res = [] - print("PASSED THROUGH EXECUTOR:", executor) command = SCHEDULER_COMMANDS[executor]["get_user_jobs"] res.extend(os.popen(command).read().split("\n")) return res @@ -61,8 +58,6 @@ def make_job(queue:str) -> Job: return Job( JobSpec( executable="/bin/date", - # arguments=['-utc', '--debug'], - # resources=ResourceSpecV1(node_count=1), attributes=JobAttributes( queue_name=queue, ), @@ -79,10 +74,6 @@ def test_queue(execparams: ExecutorTestParams) -> None: queues.extend(slurm_queues) queues.extend(lsf_queues) - print("slurm:", slurm_queues) - print("lsf:", lsf_queues) - print("extended ques:", queues) - if len(slurm_queues) != 0: scheduler = "slurm" elif len(lsf_queues) != 0: From 88b7cf61d3f82a5c906b99c965a09a8f1ce2dd8d Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Mon, 1 May 2023 14:17:43 -0700 Subject: [PATCH 05/15] test queue: raising queue error if <2 available --- tests/test_queue.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 8ea196b9..3c516310 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -79,9 +79,11 @@ def test_queue(execparams: ExecutorTestParams) -> None: elif len(lsf_queues) != 0: scheduler = "lsf" - if len(queues) <= 1: + if len(queues) < 2: + pytest.raises(Exception("Need at least two queues to perform this test")) return + print("available queues:", queues) test_queues = random.sample(queues, 2) print("test queues:", test_queues) From a110cade3635244141fce2b7983c5a62416e6d48 Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Mon, 1 May 2023 14:22:17 -0700 Subject: [PATCH 06/15] test queue: added type hints --- tests/test_queue.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 3c516310..021b3401 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -3,6 +3,7 @@ import logging from pathlib import Path +from typing import List import pytest @@ -27,13 +28,13 @@ } -def get_slurm_queues(): +def get_slurm_queues() -> None: res = os.popen("mdiag -c").read().split("\n") res = [line.split("=")[-1] for line in res if "PartitionName" in line] return res -def get_lsf_queues(): +def get_lsf_queues() -> None: res = os.popen("bqueues -u ramon").read().split("\n") res = [l for l in res if len(l) != 0] res = [l.split(" ", 1) for l in res] @@ -41,14 +42,14 @@ def get_lsf_queues(): return res -def get_queue_info(executor: str): +def get_queue_info(executor: str) -> List[str]: res = [] command = SCHEDULER_COMMANDS[executor]["get_user_jobs"] res.extend(os.popen(command).read().split("\n")) return res -def kill_job(scheduler: str, job: Job): +def kill_job(scheduler: str, job: Job) -> None: command = f"{SCHEDULER_COMMANDS[scheduler]['kill_command']} {job._native_id}" print("Kill command:", command) os.system(command) From b958212681ee6cb8c31807d84efec5d14919f519 Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Mon, 1 May 2023 15:15:12 -0700 Subject: [PATCH 07/15] test queue: get_lsf_queues() using whoami --- tests/test_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 021b3401..c4736713 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -35,7 +35,7 @@ def get_slurm_queues() -> None: def get_lsf_queues() -> None: - res = os.popen("bqueues -u ramon").read().split("\n") + res = os.popen("bqueues -u $(whoami)").read().split("\n") res = [l for l in res if len(l) != 0] res = [l.split(" ", 1) for l in res] res = [l[0] for l in res if "Active" in l[1] and len(l) != 0] From 64ccf907890de12913847a6b222ef25d2a4ce2fb Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Mon, 1 May 2023 15:45:50 -0700 Subject: [PATCH 08/15] test queues: more robust parsing for lsf systems --- tests/test_queue.py | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index c4736713..5cf00fb1 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -34,12 +34,27 @@ def get_slurm_queues() -> None: return res -def get_lsf_queues() -> None: - res = os.popen("bqueues -u $(whoami)").read().split("\n") - res = [l for l in res if len(l) != 0] - res = [l.split(" ", 1) for l in res] - res = [l[0] for l in res if "Active" in l[1] and len(l) != 0] - return res +def get_lsf_queues() -> str: + valid_queues = [] + out = "".join(os.popen("bqueues -u $(whoami) -o 'QUEUE_NAME NJOBS PEND RUN SUSP STATUS'").read()).split("\n") + out = [l for l in out if len(l) != 0] + out = [l.split(" ") for l in out] + out.pop(0) # popping headers + for queue_info in out: + name = queue_info[0] + njobs = int(queue_info[1]) + pend = int(queue_info[2]) + run = int(queue_info[3]) + susp = int(queue_info[4]) + status = str(queue_info[5]) + + if "active" not in status.lower(): + continue + + if (njobs + pend + run + susp) > 10: + valid_queues.append(name) + + return valid_queues def get_queue_info(executor: str) -> List[str]: From 0e3f3727b716b1c408536cd2bc309d5c794fe374 Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Mon, 1 May 2023 16:36:56 -0700 Subject: [PATCH 09/15] test queue: cleanup + fixed type hints --- tests/test_queue.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 5cf00fb1..cc4797ea 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -28,13 +28,12 @@ } -def get_slurm_queues() -> None: - res = os.popen("mdiag -c").read().split("\n") - res = [line.split("=")[-1] for line in res if "PartitionName" in line] - return res +def get_slurm_queues() -> List[str]: + out = os.popen("mdiag -c").read().split("\n") + return [line.split("=")[-1] for line in out if "PartitionName" in line] -def get_lsf_queues() -> str: +def get_lsf_queues() -> List[str]: valid_queues = [] out = "".join(os.popen("bqueues -u $(whoami) -o 'QUEUE_NAME NJOBS PEND RUN SUSP STATUS'").read()).split("\n") out = [l for l in out if len(l) != 0] From 71c5fb36968955c953e0ab070ca83924ce33ded8 Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Mon, 1 May 2023 16:38:41 -0700 Subject: [PATCH 10/15] test queue: debugging get_lsf_queues(), added early return --- tests/test_queue.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_queue.py b/tests/test_queue.py index cc4797ea..5cf157f3 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -38,7 +38,10 @@ def get_lsf_queues() -> List[str]: out = "".join(os.popen("bqueues -u $(whoami) -o 'QUEUE_NAME NJOBS PEND RUN SUSP STATUS'").read()).split("\n") out = [l for l in out if len(l) != 0] out = [l.split(" ") for l in out] + if len(out) == 0: + return [] out.pop(0) # popping headers + for queue_info in out: name = queue_info[0] njobs = int(queue_info[1]) From b53cc835116a615d9a1543fe1481bf851783209f Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Wed, 3 May 2023 15:41:09 -0700 Subject: [PATCH 11/15] now using dict to store commands --- tests/test_queue.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 5cf157f3..d809982a 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -21,7 +21,7 @@ "kill_command": "scancel" }, "lsf": { - "get_queues": "bqueues -u $(whoami)", + "get_queues": "bqueues -u $(whoami) -o 'QUEUE_NAME NJOBS PEND RUN SUSP STATUS'", "get_user_jobs": "bjobs", "kill_command": "bkill" } @@ -29,13 +29,15 @@ def get_slurm_queues() -> List[str]: - out = os.popen("mdiag -c").read().split("\n") + command = SCHEDULER_COMMANDS["slurm"]["get_queues"] + out = os.popen(command).read().split("\n") return [line.split("=")[-1] for line in out if "PartitionName" in line] def get_lsf_queues() -> List[str]: valid_queues = [] - out = "".join(os.popen("bqueues -u $(whoami) -o 'QUEUE_NAME NJOBS PEND RUN SUSP STATUS'").read()).split("\n") + command = SCHEDULER_COMMANDS["lsf"]["get_queues"] + out = "".join(os.popen(command).read()).split("\n") out = [l for l in out if len(l) != 0] out = [l.split(" ") for l in out] if len(out) == 0: From d69f568f2b2c0a02bdbf17abe209169b84ecc443 Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Wed, 3 May 2023 15:47:47 -0700 Subject: [PATCH 12/15] lsf: fixed output overloading --- tests/test_queue.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index d809982a..61e0770a 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -37,14 +37,14 @@ def get_slurm_queues() -> List[str]: def get_lsf_queues() -> List[str]: valid_queues = [] command = SCHEDULER_COMMANDS["lsf"]["get_queues"] - out = "".join(os.popen(command).read()).split("\n") + out = os.popen(command).read().split("\n") out = [l for l in out if len(l) != 0] - out = [l.split(" ") for l in out] - if len(out) == 0: + queues = [l.split(" ") for l in out] + if len(queues) == 0: return [] - out.pop(0) # popping headers + queues.pop(0) # popping headers - for queue_info in out: + for queue_info in queues: name = queue_info[0] njobs = int(queue_info[1]) pend = int(queue_info[2]) From 180fd1099c9886b3ad3c9835643394bbc0f27196 Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Wed, 3 May 2023 15:59:41 -0700 Subject: [PATCH 13/15] test queue: using pytest skip instead --- tests/test_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 61e0770a..c8ac6a48 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -100,7 +100,7 @@ def test_queue(execparams: ExecutorTestParams) -> None: scheduler = "lsf" if len(queues) < 2: - pytest.raises(Exception("Need at least two queues to perform this test")) + pytest.skip("Need at least two queues to perform this test") return print("available queues:", queues) From bd072ebdcd4e01fea941c91d9772a77f02b597ed Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Wed, 3 May 2023 16:50:31 -0700 Subject: [PATCH 14/15] qstats report just queue info, relative to job --- tests/test_queue.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index c8ac6a48..151501ad 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -17,12 +17,12 @@ SCHEDULER_COMMANDS = { "slurm": { "get_queues": "mdiag -c", - "get_user_jobs": "squeue -u $(whoami)", + "get_user_jobs": "squeue -o '%P' --jobs", "kill_command": "scancel" }, "lsf": { "get_queues": "bqueues -u $(whoami) -o 'QUEUE_NAME NJOBS PEND RUN SUSP STATUS'", - "get_user_jobs": "bjobs", + "get_user_jobs": "bjobs -o 'queue'", "kill_command": "bkill" } } @@ -61,10 +61,10 @@ def get_lsf_queues() -> List[str]: return valid_queues -def get_queue_info(executor: str) -> List[str]: +def get_queue_info(executor: str, job: Job) -> List[str]: res = [] command = SCHEDULER_COMMANDS[executor]["get_user_jobs"] - res.extend(os.popen(command).read().split("\n")) + res.extend(os.popen(f"{command} {job._native_id}").read().split("\n")) return res @@ -111,21 +111,13 @@ def test_queue(execparams: ExecutorTestParams) -> None: job1 = make_job(test_queues[0]) executor.submit(job1) - qstat = get_queue_info(scheduler) - job1_qstat_entry = [l for l in qstat if job1._native_id in l][0] - assert test_queues[0] in job1_qstat_entry + qstat = get_queue_info(scheduler, job1) + assert test_queues[0] in qstat job2 = make_job(test_queues[1]) executor.submit(job2) - qstat = get_queue_info(scheduler) - job2_qstat_entry = [l for l in qstat if job2._native_id in l][0] - assert test_queues[1] in job2_qstat_entry - - qstat = get_queue_info(scheduler) - print("qstat = ", "\n".join(qstat)) + qstat = get_queue_info(scheduler, job2) + assert test_queues[1] in qstat kill_job(scheduler, job1) kill_job(scheduler, job2) - - qstat = get_queue_info(scheduler) - print("qstat = ", "\n".join(qstat)) From 7a219d725862595d6d9b0a121eebec4e737dba1d Mon Sep 17 00:00:00 2001 From: Ramon Adolfo Arambula Date: Wed, 3 May 2023 16:55:38 -0700 Subject: [PATCH 15/15] test queue: running only on single launcher --- tests/test_queue.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_queue.py b/tests/test_queue.py index 151501ad..2e474e1c 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -103,6 +103,9 @@ def test_queue(execparams: ExecutorTestParams) -> None: pytest.skip("Need at least two queues to perform this test") return + if execparams.launcher != "single": + pytest.skip("No need to test non-single launchers") + print("available queues:", queues) test_queues = random.sample(queues, 2) print("test queues:", test_queues)