Skip to content

378 test queue parameter #380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
126 changes: 126 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import os
import random

import logging
from pathlib import Path
from typing import List

import pytest

from psij import Job, JobSpec, JobAttributes, JobExecutor, ResourceSpecV1
from tempfile import TemporaryDirectory

from executor_test_params import ExecutorTestParams
from _test_tools import _get_executor_instance, _get_timeout, assert_completed, _make_test_dir


SCHEDULER_COMMANDS = {
"slurm": {
"get_queues": "mdiag -c",
"get_user_jobs": "squeue -o '%P' --jobs",
"kill_command": "scancel"
},
"lsf": {
"get_queues": "bqueues -u $(whoami) -o 'QUEUE_NAME NJOBS PEND RUN SUSP STATUS'",
"get_user_jobs": "bjobs -o 'queue'",
"kill_command": "bkill"
}
}


def get_slurm_queues() -> List[str]:
command = SCHEDULER_COMMANDS["slurm"]["get_queues"]
out = os.popen(command).read().split("\n")
return [line.split("=")[-1] for line in out if "PartitionName" in line]


def get_lsf_queues() -> List[str]:
valid_queues = []
command = SCHEDULER_COMMANDS["lsf"]["get_queues"]
out = os.popen(command).read().split("\n")
out = [l for l in out if len(l) != 0]
queues = [l.split(" ") for l in out]
if len(queues) == 0:
return []
queues.pop(0) # popping headers

for queue_info in queues:
name = queue_info[0]
njobs = int(queue_info[1])
pend = int(queue_info[2])
run = int(queue_info[3])
susp = int(queue_info[4])
status = str(queue_info[5])

if "active" not in status.lower():
continue

if (njobs + pend + run + susp) > 10:
valid_queues.append(name)

return valid_queues


def get_queue_info(executor: str, job: Job) -> List[str]:
res = []
command = SCHEDULER_COMMANDS[executor]["get_user_jobs"]
res.extend(os.popen(f"{command} {job._native_id}").read().split("\n"))
return res


def kill_job(scheduler: str, job: Job) -> None:
command = f"{SCHEDULER_COMMANDS[scheduler]['kill_command']} {job._native_id}"
print("Kill command:", command)
os.system(command)


def make_job(queue:str) -> Job:
return Job(
JobSpec(
executable="/bin/date",
attributes=JobAttributes(
queue_name=queue,
),
)
)


def test_queue(execparams: ExecutorTestParams) -> None:
scheduler = ""
queues = []
slurm_queues = get_slurm_queues()
lsf_queues = get_lsf_queues()

queues.extend(slurm_queues)
queues.extend(lsf_queues)
Comment on lines +91 to +95
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of a general note, but we know what scheduler we have from execparams.

That's not the point I wanted to make though. The idea of running all possible get_*_qeues() and merging the results with the assumption that at most one of them will return non-empty results probably works. But it does so in an unnecessarily twisted way and it does rely on an assumption that isn't necessary to make or reason through.


if len(slurm_queues) != 0:
scheduler = "slurm"
elif len(lsf_queues) != 0:
scheduler = "lsf"
Comment on lines +97 to +100
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

So execparams is there to parametrize executors when multiple executors are available on a system. For example, on a SLURM system, a test with an execparams parameter will be invoked multiple times for all combinations of executor in ["local", "batch-test", "slurm"] \crossproduct launcher in ["single", "multiple", "mpirun", "srun"}.

If you ignore execparams and detect what's installed the way it's done here, it will work, but it will run the same test multiple times for no good reason.

Instead, we should run this test on only one of the launchers (the launcher doesn't matter because we don't actually care about launching a job in this test) and using all executors. So something like if execparams.launcher == 'single' then do what we need to do with the assumption that our scheduler is execparams.executor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you take a look at my implementation? I kept the way I was detecting it, but am now only running the test when execparams.launcher == "single"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about detecting the LRM on the system but the fact that we test multiple executors on that system. So even if you restrict it to the single launcher, it will still be repeated for the local, batch-test, and whatever PSI/J detected to be the scheduler.

You could remove execparams, but then you risk not having access to other necessary parameters that might be set by the users that set up the tests. By the way, you may want to use execparams.custom_attributes, since some systems require setting various things, like an account or project.


if len(queues) < 2:
pytest.skip("Need at least two queues to perform this test")
return

if execparams.launcher != "single":
pytest.skip("No need to test non-single launchers")

print("available queues:", queues)
test_queues = random.sample(queues, 2)
print("test queues:", test_queues)

executor = JobExecutor.get_instance(scheduler)

job1 = make_job(test_queues[0])
executor.submit(job1)
qstat = get_queue_info(scheduler, job1)
assert test_queues[0] in qstat

job2 = make_job(test_queues[1])
executor.submit(job2)
qstat = get_queue_info(scheduler, job2)
assert test_queues[1] in qstat

kill_job(scheduler, job1)
kill_job(scheduler, job2)