diff --git a/docs/src/index.md b/docs/src/index.md index 3e67201..1f36b8f 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -158,4 +158,4 @@ To get an exception "in hand" for further analysis, you can use the macro [`@ret File handling can become more challenging when working in a parallel and possibly distributed fashion. Code or whole workers can crash, resulting in corrupt files, or workers may become disconnected, but still write files and clash with restarted code (resulting in race conditions and may also result in corrupt files). -ParallelProcessingTools provides the functions [`create_files`](@ref), [`read_files`](@ref) and [`modify_files`](@ref) to implement atomic file operations, on a best-effort basis (depending on the operating system and underlying file systems). +ParallelProcessingTools provides the functions [`write_files`](@ref) and [`read_files`](@ref) to implement atomic file operations, on a best-effort basis (depending on the operating system and underlying file systems). diff --git a/src/fileio.jl b/src/fileio.jl index eaabadc..10ceacd 100644 --- a/src/fileio.jl +++ b/src/fileio.jl @@ -48,10 +48,45 @@ _rand_fname_tag() = String(rand(b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJ const _g_default_cachedir = Ref{String}("") const _g_default_cachedir_lock = ReentrantLock() + +const _g_file_cleanup_lock = ReentrantLock() +const _g_files_to_clean_up = Set{String}() + +const _g_files_autocleanup = Ref{Bool}(false) + +function _add_file_to_cleanup(filename::AbstractString) + lock(_g_file_cleanup_lock) do + push!(_g_files_to_clean_up, filename) + if _g_files_autocleanup[] == false + _g_files_autocleanup[] = true + atexit(_cleanup_files) + end + end +end + +function _remove_file_from_cleanup(filename::AbstractString) + lock(_g_file_cleanup_lock) do + delete!(_g_files_to_clean_up, filename) + end +end + +function _cleanup_files() + lock(_g_file_cleanup_lock) do + for filename in _g_files_to_clean_up + if isfile(filename) + rm(filename; force=true) + end + end + empty!(_g_files_to_clean_up) + end +end + + + """ ParallelProcessingTools.default_cache_dir()::String -Returns the default cache directory, e.g. for [`create_files`](@ref) and +Returns the default cache directory, e.g. for [`write_files`](@ref) and `read_files`(@ref). See also [`default_cache_dir!`](@ref). @@ -91,195 +126,223 @@ end """ - function create_files( - f_create, filenames::AbstractString...; - overwrite::Bool = true, - use_cache::Bool = false, cache_dir::AbstractString = default_cache_dir(), - create_dirs::Bool = true, delete_tmp_onerror::Bool=true, - verbose::Bool = false - ) + abstract type WriteMode -Creates `filenames` in an atomic fashion via a user-provided function -`f_create`. Returns `nothing`. +Abstract type for write modes. -Using temporary filenames, calls `f_create(temporary_filenames...)`. If -`f_create` doesn't throw an exception, the files `temporary_filenames` are -renamed to `filenames`. If `f_create` throws an exception, the temporary files -are either deleted (if `delete_tmp_onerror` is `true`) or left in place (e.g. for -debugging purposes). +May be one of the following subtypes: [`CreateNew`](@ref), +[`CreateOrIgnore`](@ref), [`CreateOrReplace`](@ref), [`CreateOrModify`](@ref), +[`ModifyExisting`](@ref). -If `use_cache` is `true`, the `temporary_filenames` are created in -`cache_dir` and then atomically moved to `filenames`, otherwise they are -created next to `filenames` (in the same directories). +Used by [`write_files`](@ref). +""" +abstract type WriteMode end +export WriteMode -If `create_dirs` is `true`, directories are created if necessary. -If all of `filenames` already exist and `overwrite` is `false`, takes no -action (or, on case the files are created by other code running in parallel, -while `f_create` is running, does not replace them). +""" + CreateOrIgnore() isa WriteMode -If `verbose` is `true`, uses log-level `Logging.Info` to log file creation, -otherwise `Logging.Debug`. +Indicates that new files should be created, and that nothing should be done if +if the files already exist. -Throws an error if only some of the files exist and `overwrite` is `false`. +Causes an error to be thrown if only some of the files exist, to indicate an +inconsistent state. -Returns `nothing`. +`CreateOrIgnore()` is the recommended default when creating files in a +parallel computing context, especially if failure or timeouts might result in +re-tries. This way, if multiple workers try to create the same file(s), only +one file or consistent set of files will be created under the target +filenames. + +See [`WriteMode`](@ref) and [`write_files`](@ref). +""" +struct CreateOrIgnore <: WriteMode end +export CreateOrIgnore + +function _already_done(ftw::CreateOrIgnore, target_fnames::AbstractVector{<:String}, any_pre_existing::Bool, all_pre_existing::Bool, loglevel::LogLevel) + if any_pre_existing + if all_pre_existing + @logmsg loglevel "Files $target_fnames already exist, nothing to do." + return true + else + throw(ErrorException("Only some of $target_fnames exist, but not allowed to replace files")) + end + else + return false + end +end -Example: +_will_modify_files(::CreateOrIgnore, all_pre_existing::Bool) = false +_should_overwrite_if_necessary(::CreateOrIgnore) = false -```julia -create_files("foo.txt", "bar.txt", use_cache = true) do foo, bar - write(foo, "Hello") - write(bar, "World") -end -``` -Set `ENV["JULIA_DEBUG"] = "ParallelProcessingTools"` to see a log of all -intermediate steps. +""" + CreateNew() isa WriteMode -On Linux you can set `use_cache = true` and `cache_dir = "/dev/shm"` to use -the default Linux RAM disk as an intermediate directory. +Indicates that new files should be created and to throw and eror if the files +already exist. -See also [`read_files`](@ref), [`modify_files`](@ref) and -[`ParallelProcessingTools.default_cache_dir`](@ref). +See [`WriteMode`](@ref) and [`write_files`](@ref). """ -function create_files( - @nospecialize(f_create), @nospecialize(filenames::AbstractString...); - overwrite::Bool = true, - use_cache::Bool = false, cache_dir::AbstractString = default_cache_dir(), - create_dirs::Bool = true, delete_tmp_onerror::Bool=true, - verbose::Bool = false -) - _create_modify_file_impl(false, f_create, filenames, overwrite, use_cache, String(cache_dir), create_dirs, delete_tmp_onerror, verbose) +struct CreateNew <: WriteMode end +export CreateNew + +function _already_done(::CreateNew, target_fnames::AbstractVector{<:String}, any_pre_existing::Bool, all_pre_existing::Bool, loglevel::LogLevel) + if any_pre_existing + throw(ErrorException("Some, but not all of $target_fnames exist, but not allowed to replace files")) + else + return false + end end -export create_files + +_will_modify_files(::CreateNew, all_pre_existing::Bool) = false +_should_overwrite_if_necessary(::CreateNew) = false """ - function modify_files( - f_modify, filenames::AbstractString...; - use_cache::Bool = false, cache_dir::AbstractString = default_cache_dir(), - create_cachedir::Bool = true, delete_tmp_onerror::Bool=true, - verbose::Bool = false - ) + CreateOrReplace() isa WriteMode -Modifies `filenames` in an atomic fashion via a user-provided function -`f_modify`. Returns `nothing`. +Indicates that new files should be created and existing files should be +replaced. -Using temporary filenames, first copies the files `filenames` to temporary -filenames. Then calls `f_modify(temporary_filenames...)`. If `f_modify` -doesn't throw an exception, the files `temporary_filenames` are then renamed -to `filenames`, replacing them. +See [`WriteMode`](@ref) and [`write_files`](@ref). +""" +struct CreateOrReplace <: WriteMode end +export CreateOrReplace -If `use_cache` is `true`, the `temporary_filenames` are created in -`cache_dir`, otherwise they are created next to `filenames` (in the same -directories). +function _already_done(::CreateOrReplace, target_fnames::AbstractVector{<:String}, any_pre_existing::Bool, all_pre_existing::Bool, loglevel::LogLevel) + return false +end -Otherwise behaves like [`create_files`](@ref) and [`read_files`](@ref) in -regard to logging and cache and error handling. +_will_modify_files(::CreateOrReplace, all_pre_existing::Bool) = false +_should_overwrite_if_necessary(::CreateOrReplace) = true -Returns `nothing`. -Example: +""" + CreateOrIgnore() isa WriteMode -```julia -write("foo.txt", "Nothing"); write("bar.txt", "here") +Indicates that either new files should be created, or that existing files +should be modified. -modify_files("foo.txt", "bar.txt", use_cache = true) do foo, bar - write(foo, "Hello") - write(bar, "World") +Causes an error to be thrown if only some of the files exist already, to +indicate an inconsistent state. + +See [`WriteMode`](@ref) and [`write_files`](@ref). +""" +struct CreateOrModify <: WriteMode end +export CreateOrModify + +function _already_done(::CreateOrModify, target_fnames::AbstractVector{<:String}, any_pre_existing::Bool, all_pre_existing::Bool, loglevel::LogLevel) + if any_pre_existing && !all_pre_existing + throw(ErrorException("Some, but not all of $target_fnames exist, but not allowed to replace files")) + else + return false + end end -``` -See also [`ParallelProcessingTools.default_cache_dir`](@ref). +_will_modify_files(::CreateOrModify, all_pre_existing::Bool) = all_pre_existing +_should_overwrite_if_necessary(::CreateOrModify) = true + + """ -function modify_files( - @nospecialize(f_modify), @nospecialize(filenames::AbstractString...); - use_cache::Bool = false, cache_dir::AbstractString = default_cache_dir(), - create_cachedir::Bool = true, delete_tmp_onerror::Bool=true, - verbose::Bool = false -) - _create_modify_file_impl(true, f_modify, filenames, true, use_cache, String(cache_dir), create_cachedir, delete_tmp_onerror, verbose) + ModifyExisting() isa WriteMode + +Indicates that existing files should be modified. + +Causes an error to be thrown if not all of the files exist already. + +See [`WriteMode`](@ref) and [`write_files`](@ref). +""" +struct ModifyExisting <: WriteMode end +export ModifyExisting + +function _already_done(::ModifyExisting, target_fnames::AbstractVector{<:String}, any_pre_existing::Bool, all_pre_existing::Bool, loglevel::LogLevel) + if !all_pre_existing + throw(ErrorException("Not all of $target_fnames exist, can't modify")) + else + return false + end end -export modify_files +_will_modify_files(::ModifyExisting, all_pre_existing::Bool) = true +_should_overwrite_if_necessary(::ModifyExisting) = true -function _create_modify_file_impl( - modify_mode::Bool, f_create_or_modify, filenames, - overwrite::Bool, use_cache::Bool, cache_dir::String, create_dirs::Bool, delete_tmp_onerror::Bool, verbose::Bool -) - loglevel = verbose ? Info : Debug - target_fnames = String[filenames...] # Fix type - staging_fnames = String[] - cache_fnames = String[] - move_complete = similar(target_fnames, Bool) - fill!(move_complete, false) - pre_existing = isfile.(target_fnames) - if any(pre_existing) - if all(pre_existing) - if !overwrite - @logmsg loglevel "Files $target_fnames already exist, nothing to do." - return nothing - end - else - !overwrite && throw(ErrorException("Only some of $target_fnames exist but not allowed to overwrite")) - end - end +""" + struct ParallelProcessingTools.FilesToWrite - dirs = dirname.(target_fnames) - if create_dirs - for dir in dirs - if !isdir(dir) && create_dirs - mkpath(dir) - @logmsg loglevel "Created output directory $dir." - end - end +Created by [`write_files`](@ref), represents a set of (temporary) files to +write to. - if use_cache && !isdir(cache_dir) - mkpath(cache_dir) - @logmsg loglevel "Created write-cache directory $cache_dir." - end - end +With `ftw::FilesToWrite`, use `collect(ftw)` or `iterate(ftw)` to access the +filenames to write to. Use `close(ftw)` or `close(ftw, true)` to close things +in good order, indicating success, and use `close(ftw, false)` or +`close(ftw, err:Exception)` to abort, indicating failure. - try - if use_cache - append!(cache_fnames, tmp_filename.(target_fnames, Ref(cache_dir))) - @assert !any(isfile, cache_fnames) - end +See [`write_files`](@ref) for example code. - append!(staging_fnames, tmp_filename.(target_fnames)) - @assert !any(isfile, staging_fnames) +If aborted or if the Julia process exits without `ftw` being closed, temporary +files are still cleaned up, unless `write_files` was used with +`delete_tmp_onerror = false`. +""" +struct FilesToWrite{M<:WriteMode} + _mode::M + _isopen::Ref{Bool} + _target_fnames::Vector{String} + _staging_fnames::Vector{String} + _cache_fnames::Vector{String} + _delete_tmp_onerror::Bool + _loglevel::LogLevel +end - writeto_fnames = use_cache ? cache_fnames : staging_fnames +_writeto_filenames(ftw::FilesToWrite) = isempty(ftw._cache_fnames) ? ftw._staging_fnames : ftw._cache_fnames - if modify_mode - @debug "Copying files $target_fnames to intermediate files $writeto_fnames." - read_files(target_fnames...; use_cache=false) do readfrom_fnames... - _parallel_cp(readfrom_fnames, writeto_fnames) - end - @debug "Modifying intermediate files $writeto_fnames." - else - @debug "Creating intermediate files $writeto_fnames." - end - f_create_or_modify(writeto_fnames...) - - post_f_write_existing = isfile.(target_fnames) - if any(post_f_write_existing) - if all(post_f_write_existing) +Base.length(ftw::FilesToWrite) = length(_writeto_filenames(ftw)) +Base.eltype(ftw::FilesToWrite) = eltype(_writeto_filenames(ftw)) +Base.iterate(ftw::FilesToWrite) = iterate(_writeto_filenames(ftw)) +Base.iterate(ftw::FilesToWrite, state) = iterate(_writeto_filenames(ftw), state) + +Base.isopen(ftw::FilesToWrite) = ftw._isopen[] + +Base.close(ftw::FilesToWrite) = _finalize_ftw(ftw) +Base.close(ftw::FilesToWrite, @nospecialize(err::Exception)) = _abort_ftw(ftw, err) +Base.close(ftw::FilesToWrite, success::Bool) = success ? _finalize_ftw(ftw) : _abort_ftw(ftw, nothing) + +function _finalize_ftw(ftw::FilesToWrite) + if ftw._isopen[] + ftw._isopen[] = false + else + return nothing + end + + mode, delete_tmp_onerror, loglevel = ftw._mode, ftw._delete_tmp_onerror, ftw._loglevel + target_fnames, staging_fnames, cache_fnames = ftw._target_fnames, ftw._staging_fnames, ftw._cache_fnames + use_cache = !isempty(cache_fnames) + + try + post_write_existing = isfile.(target_fnames) + if any(post_write_existing) + overwrite = _should_overwrite_if_necessary(mode) + if all(post_write_existing) if !overwrite @logmsg loglevel "Files $target_fnames already exist, won't replace." + _writefiles_cleanup(ftw._cache_fnames, ftw._staging_fnames) return nothing end else !overwrite && throw(ErrorException("Only some of $target_fnames exist but not allowed to replace files")) end end + + move_complete = similar(target_fnames, Bool) + fill!(move_complete, false) try if use_cache _parallel_mv(cache_fnames, staging_fnames) + foreach(_remove_file_from_cleanup, cache_fnames) empty!(cache_fnames) end @@ -291,6 +354,7 @@ function _create_modify_file_impl( @debug "Renaming file \"$staging_fn\" to \"$target_fn\"." isfile(staging_fn) || error("Expected file \"$staging_fn\" to exist, but it doesn't.") mv(staging_fn, target_fn; force=true) + _remove_file_from_cleanup(staging_fn) isfile(target_fn) || error("Tried to rename file \"$staging_fn\" to \"$target_fn\", but \"$target_fn\" doesn't exist.") move_complete[i] = true end @@ -298,7 +362,7 @@ function _create_modify_file_impl( empty!(staging_fnames) @logmsg loglevel "Created files $target_fnames." - catch + catch err if any(move_complete) && !all(move_complete) to_remove = target_fnames[findall(move_complete)] @error "Failed to rename some of the temporary files to target files, removing $to_remove" @@ -313,68 +377,383 @@ function _create_modify_file_impl( @assert isempty(staging_fnames) finally if delete_tmp_onerror - for cache_fn in cache_fnames - if isfile(cache_fn) - @debug "Removing left-over write-cache file \"$cache_fn\"." - rm(cache_fn; force=true) - end + _writefiles_cleanup(ftw._cache_fnames, ftw._staging_fnames) + end + end +end + +function _abort_ftw(@nospecialize(ftw::FilesToWrite), @nospecialize(reason::Union{Exception,Nothing})) + ftw._isopen[] = false + if isnothing(reason) + @debug "Aborting writing to $(ftw._target_fnames) as requested." + else + @debug "Aborting writing to $(ftw._target_fnames) due to exception:" reason + end + + if ftw._delete_tmp_onerror + _writefiles_cleanup(ftw._cache_fnames, ftw._staging_fnames) + end + return nothing +end + +function _writefiles_cleanup(cache_fnames::AbstractVector{<:AbstractString}, staging_fnames::AbstractVector{<:AbstractString}) + lock(_g_file_cleanup_lock) do + for cache_fn in cache_fnames + _remove_file_from_cleanup(cache_fn) + if isfile(cache_fn) + @debug "Removing left-over write-cache file \"$cache_fn\"." + rm(cache_fn; force=true) + end + end + for staging_fn in staging_fnames + _remove_file_from_cleanup(staging_fn) + if isfile(staging_fn) + @debug "Removing left-over write-staging file \"$staging_fn\"." + rm(staging_fn; force=true) + end + end + end + return nothing +end + + + +""" + function write_files( + [f_write,] filenames::AbstractString...; + mode::WriteMode = CreateOrIgnore(), + use_cache::Bool = false, cache_dir::AbstractString = default_cache_dir(), + create_dirs::Bool = true, delete_tmp_onerror::Bool=true, + verbose::Bool = false + ) + +Writes to `filenames` in an atomic fashion, on a best-effort basis (depending +on the OS and file-system used). + +`mode` determines how to handle pre-existing files, it may be +[`CreateOrIgnore()`](@ref) (default), [`CreateNew()`](@ref), +[`CreateOrReplace()`](@ref), [`CreateOrModify()`](@ref) or +[`ModifyExisting()`](@ref). + +If a writing function `f_write` is given, calls +`f_create(temporary_filenames...)`. If `f_create` doesn't throw an exception, +the files `temporary_filenames` are renamed to `filenames`, otherwise +the temporary files are are either deleted (if `delete_tmp_onerror` is `true) +or left in place (e.g. for debugging purposes). + +Set `ENV["JULIA_DEBUG"] = "ParallelProcessingTools"` to see a log of all +intermediate steps. + +For example: + +```julia +write_files("foo.txt", "bar.txt", use_cache = true) do tmp_foo, tmp_bar + write(tmp_foo, "Hello") + write(tmp_bar, "World") +end +``` + +`write_files(f_write, filenames...)` returns either `filenames`, if the +files were (re-)written or `nothing` if there was nothing to do (depending +on `mode`). + +If no writing funcion `f_write` is given then, `write_files` returns an object +of type [`FilesToWrite`](@ref) that holds the temporary filenames. Closing it +will, like above, either rename temporary files to `filenames` or remove them. +So + +```julia +ftw = write_files("foo.txt", "bar.txt") +if !isnothing(ftw) + try + foo, bar = ftw + write(foo, "Hello") + write(bar, "World") + close(ftw) + catch err + close(ftw, err) + rethrow() + end +end +``` + +is equivalent to the example using `write_files(f_write, ...)`above. + +When modifying files, `write_files` first copies existing files `filenames` to +`temporary_filenames` and otherwise behaves as described above. + +If `use_cache` is `true`, the `temporary_filenames` are located in +`cache_dir` and then atomically moved to `filenames`, otherwise they located +next to `filenames` (so in the same directories). + +If `create_dirs` is `true`, target and cache directory paths are +created if necessary. + +If `verbose` is `true`, uses log-level `Logging.Info` to log file creation, +otherwise `Logging.Debug`. + +On Linux you can set `use_cache = true` and `cache_dir = "/dev/shm"` to use +the default Linux RAM disk as an intermediate directory. + +See also [`read_files`](@ref) and +[`ParallelProcessingTools.default_cache_dir`](@ref). +""" +function write_files end +export write_files + +function write_files(@nospecialize(f_write), @nospecialize(filenames::AbstractString...); kwargs...) + if isempty(filenames) + return nothing + else + ftw = write_files(filenames...; kwargs...) + if isnothing(ftw) + return nothing + else + try + f_write(ftw...) + close(ftw) + return filenames + catch err + close(ftw, err) + rethrow() end - for staging_fn in staging_fnames - if isfile(staging_fn) - @debug "Removing left-over write-staging file \"$staging_fn\"." - rm(staging_fn; force=true) + end + end +end + +function write_files( + @nospecialize(filenames::AbstractString...); + mode::WriteMode = CreateOrIgnore(), + use_cache::Bool = false, @nospecialize(cache_dirname::AbstractString = default_cache_dir()), + create_dirs::Bool = true, delete_tmp_onerror::Bool=true, + verbose::Bool = false +) + isempty(filenames) && return nothing + + loglevel = verbose ? Info : Debug + + cache_dir = String(cache_dirname) # Fix type + target_fnames = String[filenames...] # Fix type + staging_fnames = String[] + cache_fnames = String[] + + pre_existing = isfile.(target_fnames) + any_pre_existing = any(pre_existing) + all_pre_existing = all(pre_existing) + + _already_done(mode, target_fnames, any_pre_existing, all_pre_existing, loglevel) && return nothing + + try + dirs = dirname.(target_fnames) + if create_dirs + for dir in dirs + if !isdir(dir) && !isempty(dir) + mkpath(dir) + @logmsg loglevel "Created output directory $dir." end end + + if use_cache && !isdir(cache_dir) && !isempty(cache_dir) && create_dirs + mkpath(cache_dir) + @logmsg loglevel "Created write-cache directory $cache_dir." + end + end + + if use_cache + append!(cache_fnames, tmp_filename.(target_fnames, Ref(cache_dir))) + @assert !any(isfile, cache_fnames) end + + append!(staging_fnames, tmp_filename.(target_fnames)) + @assert !any(isfile, staging_fnames) + + ftw = FilesToWrite(mode, Ref(true), target_fnames, staging_fnames, cache_fnames, delete_tmp_onerror, loglevel) + + if delete_tmp_onerror + foreach(_add_file_to_cleanup, staging_fnames) + foreach(_add_file_to_cleanup, cache_fnames) + end + + writeto_fnames = _writeto_filenames(ftw) + if _will_modify_files(mode, all_pre_existing) + @debug "Copying files $target_fnames to intermediate files $writeto_fnames." + read_files(target_fnames...; use_cache=false) do readfrom_fnames... + _parallel_cp(readfrom_fnames, writeto_fnames) + end + @debug "Modifying intermediate files $writeto_fnames." + else + @debug "Creating intermediate files $writeto_fnames." + end + + return ftw + catch + if delete_tmp_onerror + _writefiles_cleanup(cache_fnames, staging_fnames) + end + rethrow() + end + + return nothing +end + + + +""" + struct ParallelProcessingTools.FilesToRead + +Created by [`read_files`](@ref), represents a set of (temporary) files to +read from. + +With `ftr::FilesToRead`, use `collect(ftr)` or `iterate(ftr)` to access the +filenames to read from. Use `close(ftr)` or `close(ftr, true)` to close +things in good order, indicating success, and use `close(ftr, false)` or +`close(ftr, err:Exception)` to abort, indicating failure. + +See [`read_files`](@ref) for example code. + +If aborted or if the Julia process exits without `ftr` being closed, temporary +files are still cleaned up, unless `read_files` was used with +`delete_tmp_onerror = false`. +""" +struct FilesToRead + _isopen::Ref{Bool} + _source_fnames::Vector{String} + _cache_fnames::Vector{String} + _delete_tmp_onerror::Bool + _loglevel::LogLevel +end + +Base.isopen(ftr::FilesToRead) = ftr._isopen[] + +function _readfrom_filenames(ftr::FilesToRead) + if isopen(ftr) + isempty(ftr._cache_fnames) ? ftr._source_fnames : ftr._cache_fnames + else + throw(InvalidStateException("FilesToRead is closed.", :closed)) end +end +Base.length(ftr::FilesToRead) = length(_readfrom_filenames(ftr)) +Base.eltype(ftr::FilesToRead) = eltype(_readfrom_filenames(ftr)) +Base.iterate(ftr::FilesToRead) = iterate(_readfrom_filenames(ftr)) +Base.iterate(ftr::FilesToRead, state) = iterate(_readfrom_filenames(ftr), state) + +Base.close(ftr::FilesToRead) = close(ftr, true) +function Base.close(ftr::FilesToRead, @nospecialize(reason::Union{Bool,Exception})) + ftr._isopen[] = false + + if reason == true + # @debug "Reading from to $(ftr._source_fnames) was indicated to have succeeded." + elseif reason == false + @debug "Reading from to $(ftr._source_fnames) was indicated to have failed." + else + @debug "Aborted reading from $(ftr._source_fnames) due to exception:" reason + end + + if reason == true || ftr._delete_tmp_onerror + _readfiles_cleanup(ftr._cache_fnames) + end + empty!(ftr._cache_fnames) + empty!(ftr._source_fnames) return nothing end +function _readfiles_cleanup(cache_fnames::AbstractVector{<:AbstractString}) + lock(_g_file_cleanup_lock) do + for cache_fn in cache_fnames + _remove_file_from_cleanup(cache_fn) + if isfile(cache_fn) + @debug "Removing left-over read-cache file \"$cache_fn\"." + rm(cache_fn; force=true) + end + end + end +end + """ function read_files( - f_read, filenames::AbstractString...; + [f_read, ], filenames::AbstractString...; use_cache::Bool = true, cache_dir::AbstractString = default_cache_dir(), create_cachedir::Bool = true, delete_tmp_onerror::Bool=true, verbose::Bool = false ) Reads `filenames` in an atomic fashion (i.e. only if all `filenames` exist) -via a user-provided function `f_read`. The returns value of `f_read` is -passed through. +on a best-effort basis (depending on the OS and file-system used). + +If a reading function `f_read` is given, calls `f_read(filenames...)`. The +return value of `f_read` is passed through. If `use_cache` is `true`, then the files are first copied to the -temporary directory `cache_dir` under temporary names, and -`f_read(temporary_filenames...)` is called. The temporary files are deleted -afterwards. +cache directory `cache_dir` under temporary names, and then read via +`f_read(temporary_filenames...)`. The temporary files are +deleted after `f_read` exits (except if an exception is thrown during reading +and `delete_tmp_onerror` is set to `false`). -If `create_cachedir` is `true`, then `cache_dir` will be created if it doesn't -exist yet. If `delete_tmp_onerror` is true, then temporary files are -deleted even if `f_create` throws an exception. +Set `ENV["JULIA_DEBUG"] = "ParallelProcessingTools"` to see a log of all +intermediate steps. -If `verbose` is `true`, uses log-level `Logging.Info` to log file reading, -otherwise `Logging.Debug`. +For example: ```julia write("foo.txt", "Hello"); write("bar.txt", "World") -read_files("foo.txt", "bar.txt", use_cache = true) do foo, bar +result = read_files("foo.txt", "bar.txt", use_cache = true) do foo, bar read(foo, String) * " " * read(bar, String) end ``` -Set `ENV["JULIA_DEBUG"] = "ParallelProcessingTools"` to see a log of all -intermediate steps. +If no reading funcion `f_read` is given, then `read_files` returns an object +of type [`FilesToRead`](@ref) that holds the temporary filenames. Closing it +will clean up temporary files, like described above. So + + +```julia +ftr = read_files("foo.txt", "bar.txt"; use_cache = true) +result = try + foo, bar = collect(ftr) + data_read = read(foo, String) * " " * read(bar, String) + close(ftr) + data_read +catch err + close(ftr, err) + rethrow() +end +``` + +is equivalent to the example using `read_files(f_read, ...)`above. + +If `create_cachedir` is `true`, then `cache_dir` will be created if it doesn't +exist yet. + +If `verbose` is `true`, uses log-level `Logging.Info` to log file reading, +otherwise `Logging.Debug`. On Linux you can set `use_cache = true` and `cache_dir = "/dev/shm"` to use the default Linux RAM disk as an intermediate directory. -See also [`create_files`](@ref), [`modify_files`](@ref) and +See also [`write_files`](@ref) and [`ParallelProcessingTools.default_cache_dir`](@ref). """ +function read_files end +export read_files + +function read_files(@nospecialize(f_read), @nospecialize(filenames::AbstractString...); kwargs...) + ftr = read_files(filenames...; kwargs...) + try + readfrom_fnames = collect(ftr) + result = f_read(readfrom_fnames...) + close(ftr) + return result + catch err + close(ftr, err) + rethrow() + end +end + function read_files( - @nospecialize(f_read), @nospecialize(filenames::AbstractString...); + @nospecialize(filenames::AbstractString...); use_cache::Bool = true, cache_dir::AbstractString = default_cache_dir(), create_cachedir::Bool = true, delete_tmp_onerror::Bool=true, verbose::Bool = false @@ -384,6 +763,8 @@ function read_files( source_fnames = String[filenames...] # Fix type cache_fnames = String[] + @logmsg loglevel "Preparing to read files $source_fnames." + input_exists = isfile.(source_fnames) if !all(input_exists) missing_inputs = source_fnames[findall(!, input_exists)] @@ -392,7 +773,7 @@ function read_files( try if use_cache - if !isdir(cache_dir) && create_cachedir + if !isdir(cache_dir) && !isempty(cache_dir) && create_cachedir mkpath(cache_dir) @logmsg loglevel "Created read-cache directory $cache_dir." end @@ -400,31 +781,18 @@ function read_files( append!(cache_fnames, tmp_filename.(source_fnames, Ref(cache_dir))) @assert !any(isfile, cache_fnames) + foreach(_add_file_to_cleanup, cache_fnames) _parallel_cp(source_fnames, cache_fnames) end - - readfrom_fnames = use_cache ? cache_fnames : source_fnames - - @debug "Reading $(use_cache ? "cached " : "")files $readfrom_fnames." - result = f_read(readfrom_fnames...) - @logmsg loglevel "Read files $source_fnames." - - @userfriendly_exceptions @sync for cache_fn in cache_fnames - Threads.@spawn rm(cache_fn; force=true); - end - return result - finally + + return FilesToRead(Ref(true), source_fnames, cache_fnames, delete_tmp_onerror, loglevel) + catch if delete_tmp_onerror - for cache_fn in cache_fnames - if isfile(cache_fn) - @debug "Removing left-over read-cache file \"$cache_fn\"." - rm(cache_fn; force=true) - end - end + _readfiles_cleanup(cache_fnames) end + rethrow() end end -export read_files function _parallel_mv(source_fnames, target_fnames) @@ -434,6 +802,7 @@ function _parallel_mv(source_fnames, target_fnames) @debug "Moving file \"$source_fn\" to \"$target_fn\"." isfile(source_fn) || error("Expected file \"$source_fn\" to exist, but it doesn't.") mv(source_fn, target_fn) + isfile(target_fn) || error("Expected file \"$target_fn\" to exist, but it doesn't.") end end end diff --git a/test/test_fileio.jl b/test/test_fileio.jl index dd7e143..5f9ab6c 100644 --- a/test/test_fileio.jl +++ b/test/test_fileio.jl @@ -47,8 +47,47 @@ ENV["JULIA_DEBUG"] = old_julia_debug * ",ParallelProcessingTools" @test default_cache_dir() == orig_cache_dir end + + for use_cache in (true, false), throw_dummy_error in (true, false), test_abort_write in (true, false) + @testset "write_files cache=$use_cache, error=$throw_dummy_error, abort=$test_abort_write" begin + mktempdir() do dir + data1 = "Hello" + data2 = "World" + + fn1 = joinpath(dir, "targetdir", "hello.txt") + fn2 = joinpath(dir, "targetdir", "world.txt") + + @test write_files() isa Nothing + ftw = write_files("foo.txt", "bar.txt", mode = CreateOrIgnore(), use_cache = use_cache) + @test ftw isa ParallelProcessingTools.FilesToWrite{CreateOrIgnore} + tmp_foo, tmp_bar = ftw + try + write.([tmp_foo, tmp_bar], ["Hello", "World"]) + @test in(tmp_foo, ParallelProcessingTools._g_files_to_clean_up) + @test in(tmp_bar, ParallelProcessingTools._g_files_to_clean_up) + throw_dummy_error && error("Some error") + test_abort_write ? close(ftw, false) : close(ftw) + catch err + close(ftw, err) + end + if !(throw_dummy_error || test_abort_write) + @test all(isfile, ["foo.txt", "bar.txt"]) + @test read.(["foo.txt", "bar.txt"], String) == ["Hello", "World"] + @test write_files("foo.txt", "bar.txt") isa Nothing + rm.(["foo.txt", "bar.txt"]) + end + @test !in(tmp_foo, ParallelProcessingTools._g_files_to_clean_up) + @test !in(tmp_bar, ParallelProcessingTools._g_files_to_clean_up) + @test !any(isfile, ftw._cache_fnames) + @test !any(isfile, ftw._staging_fnames) + @test !any(f -> f in ParallelProcessingTools._g_files_to_clean_up, ftw._cache_fnames) + @test !any(f -> f in ParallelProcessingTools._g_files_to_clean_up, ftw._staging_fnames) + end + end + end + for use_cache in [false, true] - @testset "create_files $(use_cache ? "with" : "without") cache" begin + @testset "write_files f_write $(use_cache ? "with" : "without") cache" begin mktempdir() do dir data1 = "Hello" data2 = "World" @@ -59,7 +98,7 @@ ENV["JULIA_DEBUG"] = old_julia_debug * ",ParallelProcessingTools" # Target directory does not exist yet: try # Will not create missing target directory: - create_files(fn1, fn2, use_cache = use_cache, create_dirs = false, verbose = true) do fn1, fn2 + write_files(fn1, fn2, use_cache = use_cache, create_dirs = false, verbose = true) do fn1, fn2 write(fn1, data1); write(fn2, data2) end @test false # Should have thrown an exception @@ -68,7 +107,7 @@ ENV["JULIA_DEBUG"] = old_julia_debug * ",ParallelProcessingTools" end # Test atomicity, fail in between writing files: - @test_throws ErrorException create_files(fn1, fn2, use_cache = use_cache, verbose = true) do fn1, fn2 + @test_throws ErrorException write_files(fn1, fn2, use_cache = use_cache, verbose = true) do fn1, fn2 write(fn1, data1) error("Some error") write(fn2, data2) @@ -76,28 +115,123 @@ ENV["JULIA_DEBUG"] = old_julia_debug * ",ParallelProcessingTools" @test !isfile(fn1) && !isfile(fn2) # Will create: - create_files(fn1, fn2, use_cache = use_cache, verbose = true) do fn1, fn2 + @test write_files(fn1, fn2, use_cache = use_cache, verbose = true) do fn1, fn2 write(fn1, data1); write(fn2, data2) - end + end == (fn1, fn2) @test read(fn1, String) == data1 && read(fn2, String) == data2 + # Remove files: + rm.([fn1, fn2]) + + # Will create: + @test write_files(fn1, fn2, use_cache = use_cache, mode = CreateOrIgnore(), verbose = true) do fn1, fn2 + write(fn1, data1); write(fn2, data2) + end == (fn1, fn2) + @test read(fn1, String) == data1 && read(fn2, String) == data2 + + # Remove files: + rm.([fn1, fn2]) + + # Will create: + @test write_files(fn1, fn2, use_cache = use_cache, mode = CreateNew(), verbose = true) do fn1, fn2 + write(fn1, data1); write(fn2, data2) + end == (fn1, fn2) + @test read(fn1, String) == data1 && read(fn2, String) == data2 + + # Files already exixst: + @test_throws ErrorException write_files(fn1, fn2, use_cache = use_cache, mode = CreateNew(), verbose = true) do fn1, fn2 + write(fn1, "modified"); write(fn2, "content") + end + # Modify the target files: - modify_files(fn1, fn2, use_cache = use_cache, verbose = true) do fn1, fn2 + @test write_files(fn1, fn2, use_cache = use_cache, mode = CreateOrModify(), verbose = true) do fn1, fn2 + write(fn1, "modified"); write(fn2, "content") + end == (fn1, fn2) + @test read(fn1, String) == "modified" && read(fn2, String) == "content" + + # Remove files: + rm.([fn1, fn2]) + + # Files don't exist yet: + @test_throws ErrorException write_files(fn1, fn2, use_cache = use_cache, mode = ModifyExisting(), verbose = true) do fn1, fn2 write(fn1, "modified"); write(fn2, "content") end + + # Will create: + @test write_files(fn1, fn2, use_cache = use_cache, mode = CreateOrModify(), verbose = true) do fn1, fn2 + write(fn1, data1); write(fn2, data2) + end == (fn1, fn2) + @test read(fn1, String) == data1 && read(fn2, String) == data2 + + # Modify the target files: + @test write_files(fn1, fn2, use_cache = use_cache, mode = ModifyExisting(), verbose = true) do fn1, fn2 + write(fn1, "modified"); write(fn2, "content") + end == (fn1, fn2) @test read(fn1, String) == "modified" && read(fn2, String) == "content" # Wont't overwrite: - create_files(fn1, fn2, use_cache = use_cache, overwrite = false, verbose = true) do fn1, fn2 + @test write_files(fn1, fn2, use_cache = use_cache, verbose = true) do fn1, fn2 write(fn1, data1); write(fn2, data2) - end + end isa Nothing + @test read(fn1, String) != data1 && read(fn2, String) != data2 + + # Wont't overwrite: + @test write_files(fn1, fn2, use_cache = use_cache, mode = CreateOrIgnore(), verbose = true) do fn1, fn2 + write(fn1, data1); write(fn2, data2) + end isa Nothing @test read(fn1, String) != data1 && read(fn2, String) != data2 # Will overwrite: - create_files(fn1, fn2, use_cache = use_cache, verbose = true) do fn1, fn2 + @test write_files(fn1, fn2, use_cache = use_cache, mode = CreateOrReplace(), verbose = true) do fn1, fn2 write(fn1, data1); write(fn2, data2) - end + end == (fn1, fn2) @test read(fn1, String) == data1 && read(fn2, String) == data2 + + # Remove files: + rm.([fn1, fn2]) + end + end + end + + + for use_cache in (true, false), throw_dummy_error in (true, false), test_abort_read in (true, false) + @testset "read_files cache=$use_cache, error=$throw_dummy_error, abort=$test_abort_read" begin + mktempdir() do dir + data1 = "Hello" + data2 = "World" + + fn1 = joinpath(dir, "targetdir", "hello.txt") + fn2 = joinpath(dir, "targetdir", "world.txt") + + mkpath(dirname(fn1)); mkpath(dirname(fn2)) + write(fn1, data1); write(fn2, data2) + + ftr = read_files(fn1, fn2, use_cache = use_cache) + @test ftr isa ParallelProcessingTools.FilesToRead + tmp_foo, tmp_bar = ftr + result = try + if use_cache + @test in(tmp_foo, ParallelProcessingTools._g_files_to_clean_up) + @test in(tmp_bar, ParallelProcessingTools._g_files_to_clean_up) + end + throw_dummy_error && error("Some error") + if test_abort_read + close(ftr, false) + else + read_data = read.([tmp_foo, tmp_bar], String) + close(ftr) + read_data + end + catch err + close(ftr, err) + end + if !(throw_dummy_error || test_abort_read) + @test result == ["Hello", "World"] + end + @test !in(tmp_foo, ParallelProcessingTools._g_files_to_clean_up) + @test !in(tmp_bar, ParallelProcessingTools._g_files_to_clean_up) + @test !any(isfile, ftr._cache_fnames) + @test !any(f -> f in ParallelProcessingTools._g_files_to_clean_up, ftr._cache_fnames) end end end