From 044b52141bad4ec9a7ae007b21a12c36985a590e Mon Sep 17 00:00:00 2001 From: Jerry Ling Date: Sat, 13 Jul 2024 10:11:17 -0500 Subject: [PATCH 1/2] add HTCondor support --- src/ParallelProcessingTools.jl | 1 + src/htcondor.jl | 126 +++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 src/htcondor.jl diff --git a/src/ParallelProcessingTools.jl b/src/ParallelProcessingTools.jl index 3d3d72b..62c51a5 100644 --- a/src/ParallelProcessingTools.jl +++ b/src/ParallelProcessingTools.jl @@ -41,6 +41,7 @@ include("workerpool.jl") include("onworkers.jl") include("runworkers.jl") include("slurm.jl") +include("htcondor.jl") include("deprecated.jl") @static if !isdefined(Base, :get_extension) diff --git a/src/htcondor.jl b/src/htcondor.jl new file mode 100644 index 0000000..4d1618f --- /dev/null +++ b/src/htcondor.jl @@ -0,0 +1,126 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +""" + HTCondorRun(; + n::Int = 1 + condor_flags::Cmd = _default_condor_flags() + condor_settings::Dict{String,String} = Dict{String,String}() + julia_flags::Cmd = _default_julia_flags() + julia_depot::Vector{String} = DEPOT_PATH + jobfile_dir = homedir() + env::Dict{String,String} = Dict{String,String}() + redirect_output::Bool = true + ) + +Mode to add worker processes via HTCondor `condor_submit`. + +Condor submit script and steering `.sh` files are stored in `jobfile_dir`. + +Example: + +```julia-repl +julia> runmode = HTCondorRun(n = 10; condor_settings=Dict("universe" => "vanilla", "+queue" => "short", "request_memory" => "4GB")) +task = runworkers(runmode) + +julia> runworkers(runmode) +[ Info: Submitting HTCondor job: `condor_submit /home/jiling/jl_rAHyFadwHa.sub` +Submitting job(s).......... +10 job(s) submitted to cluster 3198291. +[ Info: HTCondor job submitted: `condor_submit /home/jiling/jl_rAHyFadwHa.sub` +(nothing, 10) + +julia> sleep(10) + +julia> nworkers() +10 +``` + +Workers can also be started manually, use +[`worker_start_command(runmode)`](@ref) to get the `condor_submit` start command and +run it from a separate process or so. +""" +@with_kw struct HTCondorRun <: DynamicAddProcsMode + n::Int = 1 + condor_flags::Cmd = _default_condor_flags() + condor_settings::Dict{String,String} = Dict{String,String}() + julia_flags::Cmd = _default_julia_flags() + julia_depot::Vector{String} = DEPOT_PATH + jobfile_dir = homedir() + env::Dict{String,String} = Dict{String,String}() + redirect_output::Bool = true +end +export HTCondorRun + +_default_condor_flags() = `` +const _g_condor_nextjlstep = Base.Threads.Atomic{Int}(1) + +function worker_start_command(runmode::HTCondorRun, manager::ElasticManager) + flags = runmode.condor_flags + n_workers = runmode.n + temp_name = tempname(runmode.jobfile_dir) + worker_script_path = temp_name*".sh" + submit_file_path = temp_name*".sub" + _generate_condor_worker_script(worker_script_path, runmode, manager) + _generate_condor_submit_file(submit_file_path, worker_script_path, runmode) + + return `condor_submit $flags $submit_file_path`, 1, n_workers +end + +function _generate_condor_worker_script(filename, runmode::HTCondorRun, manager::ElasticManager) + julia_flags = runmode.julia_flags + + request_memory = get(runmode.condor_settings, "request_memory", "2GB") + mem_per_task = _slurm_parse_memoptval(request_memory) + + heap_size_hint_fraction = 0.5 + heap_size_hint_in_MB = isnothing(mem_per_task) ? nothing : ceil(Int, mem_per_task * heap_size_hint_fraction / 1024^2) + jl_heap_size_hint_flag = isnothing(heap_size_hint_in_MB) ? `` : `--heap-size-hint=$(heap_size_hint_in_MB)M` + + jl_threads_flag = `--threads=$(1)` + + additional_julia_flags = `$jl_threads_flag $jl_heap_size_hint_flag $julia_flags` + worker_cmd = worker_local_startcmd( + manager; + julia_flags = `$julia_flags $additional_julia_flags`, + redirect_output = runmode.redirect_output, env = runmode.env + ) + depot_path = join(runmode.julia_depot, ":") + open(filename, "w") do io + write(io, + """ + export JULIA_DEPOT_PATH='$depot_path' + $worker_cmd + """) + end +end + +function _generate_condor_submit_file(submit_file_path, worker_script_path, runmode::HTCondorRun) + jlstep = atomic_add!(_g_condor_nextjlstep, 1) + jobname = "julia-$(getpid())-$jlstep" + default_dict = Dict( + "batch_name" => jobname, + ) + condor_settings = merge(default_dict, runmode.condor_settings) + + condor_option_strings = join(["$key=$value" for (key, value) in condor_settings], "\n") + open(submit_file_path, "w") do io + write(io, + """ + executable = /bin/bash + arguments = $(basename(worker_script_path)) + should_transfer_files = yes + transfer_input_files = $worker_script_path + Notification = Error + $condor_option_strings + queue $(runmode.n) + """) + end +end + +function runworkers(runmode::HTCondorRun, manager::ElasticManager) + run_cmd, m, n = worker_start_command(runmode, manager) + @info "Submitting HTCondor job: $run_cmd" + process = run(run_cmd) + @info "HTCondor job submitted: $run_cmd" + return nothing, n +end From 6bf45f99610c69d3ec35dd6d39737524133fb6b2 Mon Sep 17 00:00:00 2001 From: Jerry Ling Date: Sat, 13 Jul 2024 10:33:54 -0500 Subject: [PATCH 2/2] edit docs --- docs/src/index.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/src/index.md b/docs/src/index.md index 1f36b8f..0128c47 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -7,7 +7,7 @@ This Julia package provides some tools to ease multithreaded and distributed pro Julia provides native support for distributed computing on multiple Julia processes that run in parallel on the same or on different machines. ParallelProcessingTools add some machinery to make some aspects of this even easier. -An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)) or via SLURM ([`SlurmRun`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome). +An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)), via SLURM ([`SlurmRun`](@ref)), or via HTCondor ([`HTCondorRun`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome). The elastic cluster manager automatically adds new workers to an automatically created dynamic worker pool ([`ppt_worker_pool`](@ref)) of type [`FlexWorkerPool`](@ref) that optionally supports oversubscription. Users can `take!` workers from the pool and `put!` them back, or use [`onworker`](@ref) to send work to workers in the pool without exceeding their maximum occupancy. @@ -38,7 +38,7 @@ using ParallelProcessingTools, Distributed end runmode = OnLocalhost(n = 4) -# runmode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`) +# runmode = lkSlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`) display(worker_start_command(runmode))