Skip to content

Commit 69fc0fb

Browse files
committed
Adding support for non-blocking I/O read write froom stdin/stdout on windows platform
1 parent 70041ce commit 69fc0fb

File tree

2 files changed

+109
-19
lines changed

2 files changed

+109
-19
lines changed

cwltool/job.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,8 @@ def add_volumes(self, pathmapper, runtime, stage_output):
326326
f.write(vol.resolved.encode("utf-8"))
327327
runtime.append(u"--volume=%s:%s:ro" % (self.docker_windows_path_adjust(createtmp), self.docker_windows_path_adjust(vol.target)))
328328

329-
# changes windowspath(only) appropriately to be passed to docker run command
330-
# as docker treat them as unix paths so convert C:\Users\foo to /c/Users/foo
329+
# changes windowspath(only) appropriately to be passed to docker run command
330+
# as docker treat them as unix paths so convert C:\Users\foo to /c/Users/foo
331331
def docker_windows_path_adjust(self,path):
332332
# type: (Text) -> (Text)
333333
if os.name == 'nt':
@@ -384,7 +384,7 @@ def run(self, pull_image=True, rm_container=True,
384384
if self.stdout:
385385
runtime.append("--log-driver=none")
386386

387-
if os.name=='nt': # windows os dont have getuid or geteuid functions
387+
if os.name=='nt': # windows os dont have getuid or geteuid functions
388388
euid = docker_vm_uid()
389389
else:
390390
euid = docker_vm_uid() or os.geteuid()

cwltool/sandboxjs.py

+106-16
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import select
66
import subprocess
77
import threading
8+
import Queue
9+
import sys
810
from io import BytesIO
911
from typing import Any, Dict, List, Mapping, Text, Tuple, Union
1012

@@ -145,26 +147,114 @@ def term():
145147

146148
rselect = [nodejs.stdout, nodejs.stderr] # type: List[BytesIO]
147149
wselect = [nodejs.stdin] # type: List[BytesIO]
148-
while (len(wselect) + len(rselect)) > 0:
149-
rready, wready, _ = select.select(rselect, wselect, [])
150-
try:
151-
if nodejs.stdin in wready:
152-
b = stdin_buf.read(select.PIPE_BUF)
150+
151+
# On windows system standard input/output are not handled properly by select module(modules like pywin32, msvcrt, gevent don't work either)
152+
if sys.platform=='win32':
153+
READ_BYTES_SIZE = 512
154+
155+
# creating queue for reading from a thread to queue
156+
input_queue = Queue.Queue()
157+
output_queue = Queue.Queue()
158+
error_queue = Queue.Queue()
159+
160+
# To tell threads that output has ended and threads can safely exit
161+
no_more_output = threading.Lock()
162+
no_more_output.acquire()
163+
no_more_error = threading.Lock()
164+
no_more_error.acquire()
165+
166+
# put constructed command to input queue which then will be passed to nodejs's stdin
167+
def put_input(input_queue):
168+
while True:
169+
sys.stdout.flush()
170+
b = stdin_buf.read(READ_BYTES_SIZE)
153171
if b:
154-
os.write(nodejs.stdin.fileno(), b)
172+
input_queue.put(b)
155173
else:
156-
wselect = []
157-
for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)):
158-
if pipes[0] in rready:
159-
b = os.read(pipes[0].fileno(), select.PIPE_BUF)
174+
break
175+
176+
# get the output from nodejs's stdout and continue till otuput ends
177+
def get_output(output_queue):
178+
while not no_more_output.acquire(False):
179+
b=os.read(nodejs.stdout.fileno(), READ_BYTES_SIZE)
180+
if b:
181+
output_queue.put(b)
182+
183+
# get the output from nodejs's stderr and continue till error output ends
184+
def get_error(error_queue):
185+
while not no_more_error.acquire(False):
186+
b = os.read(nodejs.stderr.fileno(), READ_BYTES_SIZE)
187+
if b:
188+
error_queue.put(b)
189+
190+
# Threads managing nodejs.stdin, nodejs.stdout and nodejs.stderr respectively
191+
input_thread = threading.Thread(target=put_input, args=(input_queue,))
192+
input_thread.start()
193+
output_thread = threading.Thread(target=get_output, args=(output_queue,))
194+
output_thread.start()
195+
error_thread = threading.Thread(target=get_error, args=(error_queue,))
196+
error_thread.start()
197+
198+
# mark if output/error is ready
199+
output_ready=False
200+
error_ready=False
201+
202+
while (len(wselect) + len(rselect)) > 0:
203+
try:
204+
if nodejs.stdin in wselect:
205+
if not input_queue.empty():
206+
os.write(nodejs.stdin.fileno(), input_queue.get())
207+
elif not input_thread.is_alive():
208+
wselect = []
209+
if nodejs.stdout in rselect:
210+
if not output_queue.empty():
211+
output_ready = True
212+
stdout_buf.write(output_queue.get())
213+
elif output_ready:
214+
rselect = []
215+
no_more_output.release()
216+
no_more_error.release()
217+
output_thread.join()
218+
219+
if nodejs.stderr in rselect:
220+
if not error_queue.empty():
221+
error_ready = True
222+
stderr_buf.write(error_queue.get())
223+
elif error_ready:
224+
rselect = []
225+
no_more_output.release()
226+
no_more_error.release()
227+
output_thread.join()
228+
error_thread.join()
229+
if stdout_buf.getvalue().endswith("\n"):
230+
rselect = []
231+
no_more_output.release()
232+
no_more_error.release()
233+
output_thread.join()
234+
except OSError as e:
235+
break
236+
237+
else:
238+
while (len(wselect) + len(rselect)) > 0:
239+
rready, wready, _ = select.select(rselect, wselect, [])
240+
try:
241+
if nodejs.stdin in wready:
242+
b = stdin_buf.read(select.PIPE_BUF)
160243
if b:
161-
pipes[1].write(b)
244+
os.write(nodejs.stdin.fileno(), b)
162245
else:
163-
rselect.remove(pipes[0])
164-
if stdout_buf.getvalue().endswith("\n"):
165-
rselect = []
166-
except OSError as e:
167-
break
246+
wselect = []
247+
for pipes in ((nodejs.stdout, stdout_buf), (nodejs.stderr, stderr_buf)):
248+
if pipes[0] in rready:
249+
b = os.read(pipes[0].fileno(), select.PIPE_BUF)
250+
if b:
251+
pipes[1].write(b)
252+
else:
253+
rselect.remove(pipes[0])
254+
if stdout_buf.getvalue().endswith("\n"):
255+
rselect = []
256+
except OSError as e:
257+
break
168258
tm.cancel()
169259

170260
stdin_buf.close()

0 commit comments

Comments
 (0)