Skip to content

respect tmp-outdir-prefix when containerless #467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 38 additions & 34 deletions cwltool/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ def _check_docker_machine_path(path): # type: (Text) -> None
class DockerCommandLineJob(ContainerCommandLineJob):

@staticmethod
def get_image(dockerRequirement, pull_image, dry_run=False, force_pull=False):
# type: (Dict[Text, Text], bool, bool, bool) -> bool
def get_image(dockerRequirement, # type: Dict[Text, Text]
pull_image, # type: bool
force_pull=False, # type: bool
tmp_outdir_prefix=None # type: Text
): # type: (...) -> bool
found = False

if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
Expand Down Expand Up @@ -108,57 +111,58 @@ def get_image(dockerRequirement, pull_image, dry_run=False, force_pull=False):
if "dockerPull" in dockerRequirement:
cmd = ["docker", "pull", str(dockerRequirement["dockerPull"])]
_logger.info(Text(cmd))
if not dry_run:
subprocess.check_call(cmd, stdout=sys.stderr)
found = True
subprocess.check_call(cmd, stdout=sys.stderr)
found = True
elif "dockerFile" in dockerRequirement:
dockerfile_dir = str(tempfile.mkdtemp())
dockerfile_dir = str(tempfile.mkdtemp(prefix=tmp_outdir_prefix))
with open(os.path.join(dockerfile_dir, "Dockerfile"), "wb") as df:
df.write(dockerRequirement["dockerFile"].encode('utf-8'))
cmd = ["docker", "build", "--tag=%s" %
str(dockerRequirement["dockerImageId"]), dockerfile_dir]
_logger.info(Text(cmd))
if not dry_run:
subprocess.check_call(cmd, stdout=sys.stderr)
found = True
subprocess.check_call(cmd, stdout=sys.stderr)
found = True
elif "dockerLoad" in dockerRequirement:
cmd = ["docker", "load"]
_logger.info(Text(cmd))
if not dry_run:
if os.path.exists(dockerRequirement["dockerLoad"]):
_logger.info(u"Loading docker image from %s", dockerRequirement["dockerLoad"])
with open(dockerRequirement["dockerLoad"], "rb") as f:
loadproc = subprocess.Popen(cmd, stdin=f, stdout=sys.stderr)
else:
loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=sys.stderr)
_logger.info(u"Sending GET request to %s", dockerRequirement["dockerLoad"])
req = requests.get(dockerRequirement["dockerLoad"], stream=True)
n = 0
for chunk in req.iter_content(1024 * 1024):
n += len(chunk)
_logger.info("\r%i bytes" % (n))
loadproc.stdin.write(chunk)
loadproc.stdin.close()
rcode = loadproc.wait()
if rcode != 0:
raise WorkflowException("Docker load returned non-zero exit status %i" % (rcode))
found = True
if os.path.exists(dockerRequirement["dockerLoad"]):
_logger.info(u"Loading docker image from %s", dockerRequirement["dockerLoad"])
with open(dockerRequirement["dockerLoad"], "rb") as f:
loadproc = subprocess.Popen(cmd, stdin=f, stdout=sys.stderr)
else:
loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=sys.stderr)
_logger.info(u"Sending GET request to %s", dockerRequirement["dockerLoad"])
req = requests.get(dockerRequirement["dockerLoad"], stream=True)
n = 0
for chunk in req.iter_content(1024 * 1024):
n += len(chunk)
_logger.info("\r%i bytes" % (n))
loadproc.stdin.write(chunk)
loadproc.stdin.close()
rcode = loadproc.wait()
if rcode != 0:
raise WorkflowException("Docker load returned non-zero exit status %i" % (rcode))
found = True
elif "dockerImport" in dockerRequirement:
cmd = ["docker", "import", str(dockerRequirement["dockerImport"]),
str(dockerRequirement["dockerImageId"])]
_logger.info(Text(cmd))
if not dry_run:
subprocess.check_call(cmd, stdout=sys.stderr)
found = True
subprocess.check_call(cmd, stdout=sys.stderr)
found = True

if found:
with found_images_lock:
found_images.add(dockerRequirement["dockerImageId"])

return found

def get_from_requirements(self, r, req, pull_image, dry_run=False, force_pull=False):
# type: (Dict[Text, Text], bool, bool, bool, bool) -> Text
def get_from_requirements(self,
r, # type: Dict[Text, Text]
req, # type: bool
pull_image, # type: bool
force_pull=False, # type: bool
tmp_outdir_prefix=None # type: Text
): # type: (...) -> Text
if r:
errmsg = None
try:
Expand All @@ -174,7 +178,7 @@ def get_from_requirements(self, r, req, pull_image, dry_run=False, force_pull=Fa
else:
return None

if self.get_image(r, pull_image, dry_run, force_pull=force_pull):
if self.get_image(r, pull_image, force_pull, tmp_outdir_prefix):
return r["dockerImageId"]
else:
if req:
Expand Down
11 changes: 5 additions & 6 deletions cwltool/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,18 @@ def run_jobs(self,
):
pass

def execute(self, t, # type: Process
def execute(self,
t, # type: Process
job_order_object, # type: Dict[Text, Any]
logger=_logger,
**kwargs # type: Any
):
# type: (...) -> Tuple[Dict[Text, Any], Text]
**kwargs # type: Any
): # type: (...) -> Tuple[Dict[Text, Any], Text]

if "basedir" not in kwargs:
raise WorkflowException("Must provide 'basedir' in kwargs")

finaloutdir = os.path.abspath(kwargs.get("outdir")) if kwargs.get("outdir") else None
kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get(
"tmp_outdir_prefix") else tempfile.mkdtemp()
kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs.get("tmp_outdir_prefix"))
self.output_dirs.add(kwargs["outdir"])
kwargs["mutation_manager"] = MutationManager()
kwargs["toplevel"] = True
Expand Down
60 changes: 34 additions & 26 deletions cwltool/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,14 @@ def _setup(self, kwargs): # type: (Dict) -> None
_logger.debug(u"[job %s] initial work dir %s", self.name,
json.dumps({p: self.generatemapper.mapper(p) for p in self.generatemapper.files()}, indent=4))

def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move", secret_store=None):
# type: (List[Text], MutableMapping[Text, Text], bool, Text, SecretStore) -> None
def _execute(self,
runtime, # type:List[Text]
env, # type: MutableMapping[Text, Text]
rm_tmpdir=True, # type: bool
move_outputs="move", # type: Text
secret_store=None, # type: SecretStore
tmp_outdir_prefix=None # type: Text
): # type: (...) -> None

scr, _ = get_feature(self, "ShellCommandRequirement")

Expand Down Expand Up @@ -228,14 +234,9 @@ def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move", secret_sto
if builder is not None:
job_script_contents = builder.build_job_script(commands)
rcode = _job_popen(
commands,
stdin_path=stdin_path,
stdout_path=stdout_path,
stderr_path=stderr_path,
env=env,
cwd=self.outdir,
job_script_contents=job_script_contents,
)
commands, stdin_path, stdout_path, stderr_path, env,
self.outdir, tempfile.mkdtemp(prefix=tmp_outdir_prefix),
job_script_contents)

if self.successCodes and rcode in self.successCodes:
processStatus = "success"
Expand Down Expand Up @@ -334,15 +335,22 @@ def run(self, pull_image=True, rm_container=True,
stageFiles(self.generatemapper, ignoreWritable=self.inplace_update, symLink=True, secret_store=kwargs.get("secret_store"))
relink_initialworkdir(self.generatemapper, self.outdir, self.builder.outdir, inplace_update=self.inplace_update)

self._execute([], env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs, secret_store=kwargs.get("secret_store"))
self._execute(
[], env, rm_tmpdir, move_outputs, kwargs.get("secret_store"),
kwargs.get("tmp_outdir_prefix"))


class ContainerCommandLineJob(JobBase):
__metaclass__ = ABCMeta

@abstractmethod
def get_from_requirements(self, r, req, pull_image, dry_run=False, force_pull=False):
# type: (Dict[Text, Text], bool, bool, bool, bool) -> Text
def get_from_requirements(self,
r, # type: Dict[Text, Text]
req, # type: bool
pull_image, # type: bool
force_pull=False, # type: bool
tmp_outdir_prefix=None # type: Text
): # type: (...) -> Text
pass

@abstractmethod
Expand Down Expand Up @@ -377,7 +385,9 @@ def run(self, pull_image=True, rm_container=True,
try:
env = cast(MutableMapping[Text, Text], os.environ)
if docker_req and kwargs.get("use_container"):
img_id = str(self.get_from_requirements(docker_req, True, pull_image, force_pull=kwargs.get("force_docker_pull")))
img_id = str(self.get_from_requirements(docker_req, True,
pull_image, kwargs.get("force_docker_pull"),
kwargs.get('tmp_outdir_prefix')))
if img_id is None:
if self.builder.find_default_container:
default_container = self.builder.find_default_container()
Expand All @@ -404,20 +414,21 @@ def run(self, pull_image=True, rm_container=True,
runtime = self.create_runtime(env, rm_container, record_container_id, cidfile_dir, cidfile_prefix, **kwargs)
runtime.append(img_id)

self._execute(runtime, env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs, secret_store=kwargs.get("secret_store"))
self._execute(
runtime, env, rm_tmpdir, move_outputs, kwargs.get("secret_store"),
kwargs.get("tmp_outdir_prefix"))


def _job_popen(
commands, # type: List[Text]
stdin_path, # type: Text
stdout_path, # type: Text
stderr_path, # type: Text
commands, # type: List[Text]
stdin_path, # type: Text
stdout_path, # type: Text
stderr_path, # type: Text
env, # type: Union[MutableMapping[Text, Text], MutableMapping[str, str]]
cwd, # type: Text
job_dir=None, # type: Text
cwd, # type: Text
job_dir, # type: Text
job_script_contents=None, # type: Text
):
# type: (...) -> int
): # type: (...) -> int
if not job_script_contents and not FORCE_SHELLED_POPEN:

stdin = None # type: Union[IO[Any], int]
Expand Down Expand Up @@ -464,9 +475,6 @@ def _job_popen(

return rcode
else:
if job_dir is None:
job_dir = tempfile.mkdtemp(prefix="cwltooljob")

if not job_script_contents:
job_script_contents = SHELL_COMMAND_TEMPLATE

Expand Down
4 changes: 3 additions & 1 deletion cwltool/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ def _init_job(self, joborder, **kwargs):
select_resources: callback to select compute resources
debug: enable debugging output
js_console: enable javascript console output
tmp_outdir_prefix: Path prefix for intermediate output directories
"""

builder = Builder()
Expand Down Expand Up @@ -622,7 +623,8 @@ def _init_job(self, joborder, **kwargs):
builder.tmpdir = builder.fs_access.docker_compatible_realpath(kwargs.get("docker_tmpdir") or "/tmp")
builder.stagedir = builder.fs_access.docker_compatible_realpath(kwargs.get("docker_stagedir") or "/var/lib/cwl")
else:
builder.outdir = builder.fs_access.realpath(kwargs.get("outdir") or tempfile.mkdtemp())
builder.outdir = builder.fs_access.realpath(kwargs.get("outdir")
or tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]))
if self.tool[u"class"] != 'Workflow':
builder.tmpdir = builder.fs_access.realpath(kwargs.get("tmpdir") or tempfile.mkdtemp())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be fixed as well?

builder.stagedir = builder.fs_access.realpath(kwargs.get("stagedir") or tempfile.mkdtemp())
Expand Down
21 changes: 9 additions & 12 deletions cwltool/singularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class SingularityCommandLineJob(ContainerCommandLineJob):
@staticmethod
def get_image(dockerRequirement, # type: Dict[Text, Text]
pull_image, # type: bool
dry_run=False, # type: bool
force_pull=False # type: bool
):
# type: (...) -> bool
Expand Down Expand Up @@ -95,9 +94,8 @@ def get_image(dockerRequirement, # type: Dict[Text, Text]
str(dockerRequirement["dockerImageId"]),
str(dockerRequirement["dockerPull"])]
_logger.info(Text(cmd))
if not dry_run:
check_call(cmd, stdout=sys.stderr)
found = True
check_call(cmd, stdout=sys.stderr)
found = True
elif "dockerFile" in dockerRequirement:
raise WorkflowException(SourceLine(
dockerRequirement, 'dockerFile').makeError(
Expand All @@ -117,13 +115,12 @@ def get_image(dockerRequirement, # type: Dict[Text, Text]
return found

def get_from_requirements(self,
r, # type: Optional[Dict[Text, Text]]
req, # type: bool
pull_image, # type: bool
dry_run=False, # type: bool
force_pull=False # type: bool
):
# type: (...) -> Text
r, # type: Optional[Dict[Text, Text]]
req, # type: bool
pull_image, # type: bool
force_pull=False, # type: bool
tmp_outdir_prefix=None # type: Text
): # type: (...) -> Text
"""
Returns the filename of the Singularity image (e.g.
hello-world-latest.img).
Expand All @@ -144,7 +141,7 @@ def get_from_requirements(self,
else:
return None

if self.get_image(r, pull_image, dry_run, force_pull):
if self.get_image(r, pull_image, force_pull):
return os.path.abspath(r["dockerImageId"])
else:
if req:
Expand Down
10 changes: 2 additions & 8 deletions cwltool/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,8 @@ def __init__(self, workflow, **kwargs):
self.state = None # type: Dict[Text, WorkflowStateItem]
self.processStatus = None # type: Text
self.did_callback = False

if "outdir" in kwargs:
self.outdir = kwargs["outdir"]
elif "tmp_outdir_prefix" in kwargs:
self.outdir = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"])
else:
# tmp_outdir_prefix defaults to tmp, so this is unlikely to be used
self.outdir = tempfile.mkdtemp()
self.outdir = kwargs.get("outdir")
self.outdir = tempfile.mkdtemp(prefix=kwargs.get("tmp_outdir_prefix"))

self.name = uniquename(u"workflow %s" % kwargs.get("name", shortname(self.workflow.tool.get("id", "embedded"))))

Expand Down
2 changes: 1 addition & 1 deletion jenkins.bat
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ set PATH=%PATH%;"C:\\Program Files\\Docker Toolbox\\"
docker-machine start default
REM Set the environment variables to use docker-machine and docker commands
FOR /f "tokens=*" %%i IN ('docker-machine env --shell cmd default') DO %%i

docker version
python setup.py test --addopts "--junit-xml=tests.xml --cov-report xml --cov cwltool"
pip install codecov
codecov
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ passenv = CI TRAVIS TRAVIS_*
deps =
-rrequirements.txt
py{27,34,35,36}-unit: codecov
py{27,34,35,36}-unit: pytest-xdist
py{27,34,35,36}-lint: flake8

commands =
Expand Down