Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ Standard library changes

#### Distributed

* The package environment (active project, `LOAD_PATH`, `DEPOT_PATH`) are now propagated
when adding *local* workers (e.g. with `addprocs(N::Int)` or through the `--procs=N`
command line flag) ([#43270]).
* `addprocs` for local workers now accept the `env` keyword argument for passing
environment variables to the workers processes. This was already supported for
remote workers ([#43270]).

#### UUIDs

#### Mmap
Expand Down
1 change: 1 addition & 0 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ default_addprocs_params() = Dict{Symbol,Any}(
:dir => pwd(),
:exename => joinpath(Sys.BINDIR, julia_exename()),
:exeflags => ``,
:env => [],
:enable_threaded_blas => false,
:lazy => true)

Expand Down
39 changes: 34 additions & 5 deletions stdlib/Distributed/src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ It is possible to launch multiple processes on a remote host by using a tuple in
workers to be launched on the specified host. Passing `:auto` as the worker count will
launch as many workers as the number of CPU threads on the remote host.

Examples:
**Examples**:
```julia
addprocs([
"remote1", # one worker on 'remote1' logging in with the current username
Expand All @@ -76,7 +76,7 @@ addprocs([
])
```

Keyword arguments:
**Keyword arguments**:

* `tunnel`: if `true` then SSH tunneling will be used to connect to the worker from the
master process. Default is `false`.
Expand Down Expand Up @@ -445,10 +445,17 @@ end

Launch `np` workers on the local host using the in-built `LocalManager`.

*Keyword arguments:*
Local workers inherit the current package environment (i.e., active project,
[`LOAD_PATH`](@ref), and [`DEPOT_PATH`](@ref)) from the main process.

**Keyword arguments**:
- `restrict::Bool`: if `true` (default) binding is restricted to `127.0.0.1`.
- `dir`, `exename`, `exeflags`, `topology`, `lazy`, `enable_threaded_blas`: same effect
- `dir`, `exename`, `exeflags`, `env`, `topology`, `lazy`, `enable_threaded_blas`: same effect
as for `SSHManager`, see documentation for [`addprocs(machines::AbstractVector)`](@ref).

!!! compat "Julia 1.9"
The inheriting of the package environment and the `env` keyword argument were
added in Julia 1.9.
"""
function addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...)
manager = LocalManager(np, restrict)
Expand All @@ -463,10 +470,32 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi
exename = params[:exename]
exeflags = params[:exeflags]
bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)`
env = Dict{String,String}(params[:env])

# TODO: Maybe this belongs in base/initdefs.jl as a package_environment() function
# together with load_path() etc. Might be useful to have when spawning julia
# processes outside of Distributed.jl too.
# JULIA_(LOAD|DEPOT)_PATH are used to populate (LOAD|DEPOT)_PATH on startup,
# but since (LOAD|DEPOT)_PATH might have changed they are re-serialized here.
# Users can opt-out of this by passing `env = ...` to addprocs(...).
pathsep = Sys.iswindows() ? ";" : ":"
if get(env, "JULIA_LOAD_PATH", nothing) === nothing
env["JULIA_LOAD_PATH"] = join(LOAD_PATH, pathsep)
end
if get(env, "JULIA_DEPOT_PATH", nothing) === nothing
env["JULIA_DEPOT_PATH"] = join(DEPOT_PATH, pathsep)
end
# Set the active project on workers using JULIA_PROJECT.
# Users can opt-out of this by (i) passing `env = ...` or (ii) passing
# `--project=...` as `exeflags` to addprocs(...).
project = Base.ACTIVE_PROJECT[]
if project !== nothing && get(env, "JULIA_PROJECT", nothing) === nothing
env["JULIA_PROJECT"] = project
end

for i in 1:manager.np
cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker`
io = open(detach(setenv(cmd, dir=dir)), "r+")
io = open(detach(setenv(addenv(cmd, env), dir=dir)), "r+")
write_cookie(io)

wconfig = WorkerConfig()
Expand Down
107 changes: 107 additions & 0 deletions stdlib/Distributed/test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,113 @@ for p in procs()
@test @fetchfrom(p, i27429) == 27429
end

# Propagation of package environments for local workers (#28781)
let julia = `$(Base.julia_cmd()) --startup-file=no`; mktempdir() do tmp
project = mkdir(joinpath(tmp, "project"))
depots = [mkdir(joinpath(tmp, "depot1")), mkdir(joinpath(tmp, "depot2"))]
load_path = [mkdir(joinpath(tmp, "load_path")), "@stdlib", "@"]
pathsep = Sys.iswindows() ? ";" : ":"
env = Dict(
"JULIA_DEPOT_PATH" => join(depots, pathsep),
"JULIA_LOAD_PATH" => join(load_path, pathsep),
)
setupcode = """
using Distributed, Test
@everywhere begin
depot_path() = DEPOT_PATH
load_path() = LOAD_PATH
active_project() = Base.ACTIVE_PROJECT[]
end
"""
testcode = setupcode * """
for w in workers()
@test remotecall_fetch(depot_path, w) == DEPOT_PATH
@test remotecall_fetch(load_path, w) == LOAD_PATH
@test remotecall_fetch(Base.load_path, w) == Base.load_path()
@test remotecall_fetch(active_project, w) == Base.ACTIVE_PROJECT[]
@test remotecall_fetch(Base.active_project, w) == Base.active_project()
end
"""
# No active project
extracode = """
for w in workers()
@test remotecall_fetch(active_project, w) === Base.ACTIVE_PROJECT[] === nothing
end
"""
cmd = setenv(`$(julia) -p1 -e $(testcode * extracode)`, env)
@test success(cmd)
# --project
extracode = """
for w in workers()
@test remotecall_fetch(active_project, w) == Base.ACTIVE_PROJECT[] ==
$(repr(project))
end
"""
cmd = setenv(`$(julia) --project=$(project) -p1 -e $(testcode * extracode)`, env)
@test success(cmd)
# JULIA_PROJECT
cmd = setenv(`$(julia) -p1 -e $(testcode * extracode)`,
(env["JULIA_PROJECT"] = project; env))
@test success(cmd)
# Pkg.activate(...)
activateish = """
Base.ACTIVE_PROJECT[] = $(repr(project))
using Distributed
addprocs(1)
"""
cmd = setenv(`$(julia) -e $(activateish * testcode * extracode)`, env)
@test success(cmd)
# JULIA_(LOAD|DEPOT)_PATH
shufflecode = """
d = reverse(DEPOT_PATH)
append!(empty!(DEPOT_PATH), d)
l = reverse(LOAD_PATH)
append!(empty!(LOAD_PATH), l)
"""
addcode = """
using Distributed
addprocs(1) # after shuffling
"""
extracode = """
for w in workers()
@test remotecall_fetch(load_path, w) == $(repr(reverse(load_path)))
@test remotecall_fetch(depot_path, w) == $(repr(reverse(depots)))
end
"""
cmd = setenv(`$(julia) -e $(shufflecode * addcode * testcode * extracode)`, env)
@test success(cmd)
# Mismatch when shuffling after proc addition
failcode = shufflecode * setupcode * """
for w in workers()
@test remotecall_fetch(load_path, w) == reverse(LOAD_PATH) == $(repr(load_path))
@test remotecall_fetch(depot_path, w) == reverse(DEPOT_PATH) == $(repr(depots))
end
"""
cmd = setenv(`$(julia) -p1 -e $(failcode)`, env)
@test success(cmd)
# Passing env or exeflags to addprocs(...) to override defaults
envcode = """
using Distributed
project = mktempdir()
env = Dict(
"JULIA_LOAD_PATH" => LOAD_PATH[1],
"JULIA_DEPOT_PATH" => DEPOT_PATH[1],
)
addprocs(1; env = env, exeflags = `--project=\$(project)`)
env["JULIA_PROJECT"] = project
addprocs(1; env = env)
""" * setupcode * """
for w in workers()
@test remotecall_fetch(depot_path, w) == [DEPOT_PATH[1]]
@test remotecall_fetch(load_path, w) == [LOAD_PATH[1]]
@test remotecall_fetch(active_project, w) == project
@test remotecall_fetch(Base.active_project, w) == joinpath(project, "Project.toml")
end
"""
cmd = setenv(`$(julia) -e $(envcode)`, env)
@test success(cmd)
end end

include("splitrange.jl")

# Run topology tests last after removing all workers, since a given
Expand Down