Skip to content

Commit 6529869

Browse files
Handle spurious ReceiveScatterOutput callbacks
This commit fixes #2003 by handling spurious, repeated callbacks of the `receive_scatter_output` method of the `ReceiveScatterOutput` class. The reason of multiple awakenings has not been investigated deeply, though. In the future, a thorough examination of the `MultithreadedJobExecutor` logic may be necessary.
1 parent 18b8fdf commit 6529869

File tree

1 file changed

+9
-4
lines changed

1 file changed

+9
-4
lines changed

cwltool/workflow_job.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,16 @@ def __init__(
8888
) -> None:
8989
"""Initialize."""
9090
self.dest = dest
91-
self.completed = 0
91+
self._completed = set()
9292
self.processStatus = "success"
9393
self.total = total
9494
self.output_callback = output_callback
9595
self.steps: List[Optional[JobsGeneratorType]] = []
9696

97+
@property
98+
def completed(self) -> int:
99+
return len(self._completed)
100+
97101
def receive_scatter_output(self, index: int, jobout: CWLObjectType, processStatus: str) -> None:
98102
"""Record the results of a scatter operation."""
99103
for key, val in jobout.items():
@@ -108,10 +112,11 @@ def receive_scatter_output(self, index: int, jobout: CWLObjectType, processStatu
108112
if self.processStatus != "permanentFail":
109113
self.processStatus = processStatus
110114

111-
self.completed += 1
115+
if index not in self._completed:
116+
self._completed.add(index)
112117

113-
if self.completed == self.total:
114-
self.output_callback(self.dest, self.processStatus)
118+
if self.completed == self.total:
119+
self.output_callback(self.dest, self.processStatus)
115120

116121
def setTotal(
117122
self,

0 commit comments

Comments
 (0)