diff --git a/quamash/_windows.py b/quamash/_windows.py index e12c85e..4be52e3 100644 --- a/quamash/_windows.py +++ b/quamash/_windows.py @@ -59,6 +59,7 @@ class _IocpProactor(windows_events.IocpProactor): def __init__(self): self.__events = [] super(_IocpProactor, self).__init__() + self._lock = QtCore.QMutex() def select(self, timeout=None): """Override in order to handle events in a threadsafe manner.""" @@ -72,6 +73,14 @@ def close(self): self._logger.debug('Closing') super(_IocpProactor, self).close() + def recv(self, conn, nbytes, flags=0): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).recv(conn, nbytes, flags) + + def send(self, conn, buf, flags=0): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).send(conn, buf, flags) + def _poll(self, timeout=None): """Override in order to handle events in a threadsafe manner.""" if timeout is None: @@ -85,31 +94,44 @@ def _poll(self, timeout=None): if ms >= UINT32_MAX: raise ValueError("timeout too big") - while True: - # self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format( - # ms, threading.get_ident())) - status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) - if status is None: - break + with QtCore.QMutexLocker(self._lock): + while True: + # self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format( + # ms, threading.get_ident())) + status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) + if status is None: + break + + err, transferred, key, address = status + try: + f, ov, obj, callback = self._cache.pop(address) + except KeyError: + # key is either zero, or it is used to return a pipe + # handle which should be closed to avoid a leak. + if key not in (0, _overlapped.INVALID_HANDLE_VALUE): + _winapi.CloseHandle(key) + ms = 0 + continue + + if obj in self._stopped_serving: + f.cancel() + # Futures might already be resolved or cancelled + elif not f.done(): + self.__events.append((f, callback, transferred, key, ov)) - err, transferred, key, address = status - try: - f, ov, obj, callback = self._cache.pop(address) - except KeyError: - # key is either zero, or it is used to return a pipe - # handle which should be closed to avoid a leak. - if key not in (0, _overlapped.INVALID_HANDLE_VALUE): - _winapi.CloseHandle(key) ms = 0 - continue - if obj in self._stopped_serving: - f.cancel() - # Futures might already be resolved or cancelled - elif not f.done(): - self.__events.append((f, callback, transferred, key, ov)) + def _wait_for_handle(self, handle, timeout, _is_cancel): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self)._wait_for_handle(handle, timeout, _is_cancel) + + def accept(self, listener): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).accept(listener) - ms = 0 + def connect(self, conn, address): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).connect(conn, address) @with_logger