Skip to content

make_numbered_dir() multi-process-safe #143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 80 additions & 37 deletions py/_path/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from __future__ import with_statement

from contextlib import contextmanager
import sys, os, re, atexit, io
import sys, os, re, atexit, io, uuid
import py
from py._path import common
from py._path.common import iswin32, fspath
Expand Down Expand Up @@ -804,7 +804,8 @@ def make_numbered_dir(cls, prefix='session-', rootdir=None, keep=3,
""" return unique directory with a number greater than the current
maximum one. The number is assumed to start directly after prefix.
if keep is true directories with a number less than (maxnum-keep)
will be removed.
will be removed. If .lock files are used (lock_timeout non-zero),
algorithm is multi-process safe.
"""
if rootdir is None:
rootdir = cls.get_temproot()
Expand All @@ -819,37 +820,19 @@ def parse_num(path):
except ValueError:
pass

# compute the maximum number currently in use with the
# prefix
lastmax = None
while True:
maxnum = -1
for path in rootdir.listdir():
num = parse_num(path)
if num is not None:
maxnum = max(maxnum, num)

# make the new directory
try:
udir = rootdir.mkdir(prefix + str(maxnum+1))
except py.error.EEXIST:
# race condition: another thread/process created the dir
# in the meantime. Try counting again
if lastmax == maxnum:
raise
lastmax = maxnum
continue
break

# put a .lock file in the new directory that will be removed at
# process exit
if lock_timeout:
lockfile = udir.join('.lock')
def create_lockfile(path):
""" exclusively create lockfile. Throws when failed """
mypid = os.getpid()
lockfile = path.join('.lock')
if hasattr(lockfile, 'mksymlinkto'):
lockfile.mksymlinkto(str(mypid))
else:
lockfile.write(str(mypid))
lockfile.write(str(mypid), 'wx')
return lockfile

def atexit_remove_lockfile(lockfile):
""" ensure lockfile is removed at process exit """
mypid = os.getpid()
def try_remove_lockfile():
# in a fork() situation, only the last process should
# remove the .lock, otherwise the other processes run the
Expand All @@ -864,19 +847,79 @@ def try_remove_lockfile():
pass
atexit.register(try_remove_lockfile)

# compute the maximum number currently in use with the prefix
lastmax = None
while True:
maxnum = -1
for path in rootdir.listdir():
num = parse_num(path)
if num is not None:
maxnum = max(maxnum, num)

# make the new directory
try:
udir = rootdir.mkdir(prefix + str(maxnum+1))
if lock_timeout:
lockfile = create_lockfile(udir)
atexit_remove_lockfile(lockfile)
except (py.error.EEXIST, py.error.ENOENT):
# race condition (1): another thread/process created the dir
# in the meantime - try again
# race condition (2): another thread/process spuriously acquired
# lock treating empty directory as candidate
# for removal - try again
if lastmax == maxnum:
raise
lastmax = maxnum
continue
break

def get_mtime(path):
""" read file modification time """
try:
return path.lstat().mtime
except py.error.Error:
pass

garbage_prefix = prefix + 'garbage-'

def is_garbage(path):
""" check if path denotes directory scheduled for removal """
bn = path.basename
return bn.startswith(garbage_prefix)

# prune old directories
if keep:
udir_time = get_mtime(udir)
if keep and udir_time:
for path in rootdir.listdir():
num = parse_num(path)
if num is not None and num <= (maxnum - keep):
lf = path.join('.lock')
try:
t1 = lf.lstat().mtime
t2 = lockfile.lstat().mtime
if not lock_timeout or abs(t2-t1) < lock_timeout:
continue # skip directories still locked
except py.error.Error:
pass # assume that it means that there is no 'lf'
# try acquiring lock to remove directory as exclusive user
if lock_timeout:
create_lockfile(path)
except (py.error.EEXIST, py.error.ENOENT):
path_time = get_mtime(path)
if not path_time:
# assume directory doesn't exist now
continue
if abs(udir_time - path_time) < lock_timeout:
# assume directory with lockfile exists
# and lock timeout hasn't expired yet
continue

# path dir locked for exlusive use
# and scheduled for removal to avoid another thread/process
# treating it as a new directory or removal candidate
garbage_path = rootdir.join(garbage_prefix + str(uuid.uuid4()))
try:
path.rename(garbage_path)
garbage_path.remove(rec=1)
except KeyboardInterrupt:
raise
except: # this might be py.error.Error, WindowsError ...
pass
if is_garbage(path):
try:
path.remove(rec=1)
except KeyboardInterrupt:
Expand Down
23 changes: 23 additions & 0 deletions testing/path/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
import os
import sys
import multiprocessing
from py.path import local
import common

Expand Down Expand Up @@ -41,6 +42,21 @@ def __fspath__(self):
return FakeFSPathClass(os.path.join("this", "is", "a", "fake", "path"))


def batch_make_numbered_dirs(rootdir, repeats):
try:
for i in range(repeats):
dir_ = py.path.local.make_numbered_dir(prefix='repro-', rootdir=rootdir)
file_ = dir_.join('foo')
file_.write('%s' % i)
actual = int(file_.read())
assert actual == i, 'int(file_.read()) is %s instead of %s' % (actual, i)
dir_.join('.lock').remove(ignore_errors=True)
return True
except KeyboardInterrupt:
# makes sure that interrupting test session won't hang it
os.exit(2)


class TestLocalPath(common.CommonFSTests):
def test_join_normpath(self, tmpdir):
assert tmpdir.join(".") == tmpdir
Expand Down Expand Up @@ -430,6 +446,13 @@ def notimpl(x, y):
assert x.relto(tmpdir)
assert x.check()

def test_make_numbered_dir_multiprocess_safe(self, tmpdir):
# https://github.com/pytest-dev/py/issues/30
pool = multiprocessing.Pool()
results = [pool.apply_async(batch_make_numbered_dirs, [tmpdir, 100]) for _ in range(20)]
for r in results:
assert r.get() == True

def test_locked_make_numbered_dir(self, tmpdir):
for i in range(10):
numdir = local.make_numbered_dir(prefix='base2.', rootdir=tmpdir,
Expand Down