diff --git a/src/event/event_windows.c b/src/event/event_windows.c index 33a8bad73..23e1eb78c 100644 --- a/src/event/event_windows.c +++ b/src/event/event_windows.c @@ -28,18 +28,44 @@ enum _dispatch_windows_port { DISPATCH_PORT_TIMER_CLOCK_UPTIME, DISPATCH_PORT_TIMER_CLOCK_MONOTONIC, DISPATCH_PORT_FILE_HANDLE, + DISPATCH_PORT_PIPE_HANDLE_READ, + DISPATCH_PORT_PIPE_HANDLE_WRITE, +}; + +enum _dispatch_muxnote_events { + DISPATCH_MUXNOTE_EVENT_READ = 1 << 0, + DISPATCH_MUXNOTE_EVENT_WRITE = 1 << 1, }; #pragma mark dispatch_unote_t typedef struct dispatch_muxnote_s { LIST_ENTRY(dispatch_muxnote_s) dmn_list; + LIST_HEAD(, dispatch_unote_linkage_s) dmn_readers_head; + LIST_HEAD(, dispatch_unote_linkage_s) dmn_writers_head; + + // This refcount solves a race condition that can happen with I/O completion + // ports. When we enqueue packets with muxnote pointers associated with + // them, it's possible that those packets might not be processed until after + // the event has been unregistered. We increment this upon creating a + // muxnote or posting to a completion port, and we decrement it upon + // unregistering the event or processing a packet. When it hits zero, we + // dispose the muxnote. + os_atomic(uintptr_t) dmn_refcount; + dispatch_unote_ident_t dmn_ident; int8_t dmn_filter; enum _dispatch_muxnote_handle_type { DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID, DISPATCH_MUXNOTE_HANDLE_TYPE_FILE, + DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE, } dmn_handle_type; + enum _dispatch_muxnote_events dmn_events; + + // Used by the pipe monitoring thread + HANDLE dmn_thread; + HANDLE dmn_event; + os_atomic(bool) dmn_stop; } *dispatch_muxnote_t; static LIST_HEAD(dispatch_muxnote_bucket_s, dispatch_muxnote_s) @@ -71,7 +97,8 @@ _dispatch_unote_muxnote_find(struct dispatch_muxnote_bucket_s *dmb, } static dispatch_muxnote_t -_dispatch_muxnote_create(dispatch_unote_t du) +_dispatch_muxnote_create(dispatch_unote_t du, + enum _dispatch_muxnote_events events) { dispatch_muxnote_t dmn; int8_t filter = du._du->du_filter; @@ -81,12 +108,18 @@ _dispatch_muxnote_create(dispatch_unote_t du) if (dmn == NULL) { DISPATCH_INTERNAL_CRASH(0, "_dispatch_calloc"); } + os_atomic_store(&dmn->dmn_refcount, 1, relaxed); dmn->dmn_ident = (dispatch_unote_ident_t)handle; dmn->dmn_filter = filter; + dmn->dmn_events = events; + LIST_INIT(&dmn->dmn_readers_head); + LIST_INIT(&dmn->dmn_writers_head); switch (filter) { case EVFILT_SIGNAL: WIN_PORT_ERROR(); + free(dmn); + return NULL; case EVFILT_WRITE: case EVFILT_READ: @@ -103,17 +136,28 @@ _dispatch_muxnote_create(dispatch_unote_t du) // The specified file is a character file, typically a // LPT device or a console. WIN_PORT_ERROR(); + free(dmn); + return NULL; case FILE_TYPE_DISK: // The specified file is a disk file - dmn->dmn_handle_type = - DISPATCH_MUXNOTE_HANDLE_TYPE_FILE; + dmn->dmn_handle_type = DISPATCH_MUXNOTE_HANDLE_TYPE_FILE; break; case FILE_TYPE_PIPE: // The specified file is a socket, a named pipe, or an - // anonymous pipe. - WIN_PORT_ERROR(); + // anonymous pipe. Use GetNamedPipeInfo() to distinguish between + // a pipe and a socket. Despite its name, it also succeeds for + // anonymous pipes. + if (!GetNamedPipeInfo(handle, NULL, NULL, NULL, NULL)) { + // We'll get ERROR_ACCESS_DENIED for outbound pipes. + if (GetLastError() != ERROR_ACCESS_DENIED) { + // The file is probably a socket. + WIN_PORT_ERROR(); + } + } + dmn->dmn_handle_type = DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE; + break; } break; @@ -126,13 +170,136 @@ _dispatch_muxnote_create(dispatch_unote_t du) return dmn; } +static void +_dispatch_muxnote_stop(dispatch_muxnote_t dmn) +{ + if (dmn->dmn_thread) { + // Keep trying to cancel ReadFile() until the thread exits + os_atomic_store(&dmn->dmn_stop, true, relaxed); + SetEvent(dmn->dmn_event); + do { + CancelIoEx((HANDLE)dmn->dmn_ident, /* lpOverlapped */ NULL); + } while (WaitForSingleObject(dmn->dmn_thread, 1) == WAIT_TIMEOUT); + CloseHandle(dmn->dmn_thread); + dmn->dmn_thread = NULL; + } + if (dmn->dmn_event) { + CloseHandle(dmn->dmn_event); + dmn->dmn_event = NULL; + } +} + static void _dispatch_muxnote_dispose(dispatch_muxnote_t dmn) { + if (dmn->dmn_thread) { + DISPATCH_INTERNAL_CRASH(0, "disposed a muxnote with an active thread"); + } free(dmn); } -DISPATCH_ALWAYS_INLINE +static void +_dispatch_muxnote_retain(dispatch_muxnote_t dmn) +{ + uintptr_t refcount = os_atomic_inc(&dmn->dmn_refcount, relaxed); + if (refcount == 0) { + DISPATCH_INTERNAL_CRASH(0, "muxnote refcount overflow"); + } + if (refcount == 1) { + DISPATCH_INTERNAL_CRASH(0, "retained a disposing muxnote"); + } +} + +static void +_dispatch_muxnote_release(dispatch_muxnote_t dmn) +{ + uintptr_t refcount = os_atomic_dec(&dmn->dmn_refcount, relaxed); + if (refcount == 0) { + _dispatch_muxnote_dispose(dmn); + } else if (refcount == UINTPTR_MAX) { + DISPATCH_INTERNAL_CRASH(0, "muxnote refcount underflow"); + } +} + +static unsigned WINAPI +_dispatch_pipe_monitor_thread(void *context) +{ + dispatch_muxnote_t dmn = (dispatch_muxnote_t)context; + HANDLE hPipe = (HANDLE)dmn->dmn_ident; + do { + char cBuffer[1]; + DWORD dwNumberOfBytesTransferred; + OVERLAPPED ov = {0}; + BOOL bSuccess = ReadFile(hPipe, cBuffer, /* nNumberOfBytesToRead */ 0, + &dwNumberOfBytesTransferred, &ov); + DWORD dwBytesAvailable; + DWORD dwError = GetLastError(); + if (!bSuccess && dwError == ERROR_IO_PENDING) { + bSuccess = GetOverlappedResult(hPipe, &ov, + &dwNumberOfBytesTransferred, /* bWait */ TRUE); + dwError = GetLastError(); + } + if (bSuccess) { + bSuccess = PeekNamedPipe(hPipe, NULL, 0, NULL, &dwBytesAvailable, + NULL); + dwError = GetLastError(); + } + if (bSuccess) { + if (dwBytesAvailable == 0) { + // This can happen with a zero-byte write. Try again. + continue; + } + } else if (dwError == ERROR_NO_DATA) { + // The pipe is nonblocking. Try again. + Sleep(0); + continue; + } else { + _dispatch_debug("pipe[0x%llx]: GetLastError() returned %lu", + (long long)hPipe, dwError); + if (dwError == ERROR_OPERATION_ABORTED) { + continue; + } + os_atomic_store(&dmn->dmn_stop, true, relaxed); + dwBytesAvailable = 0; + } + + // Make sure the muxnote stays alive until the packet is dequeued + _dispatch_muxnote_retain(dmn); + + // The lpOverlapped parameter does not actually need to point to an + // OVERLAPPED struct. It's really just a pointer to pass back to + // GetQueuedCompletionStatus(). + bSuccess = PostQueuedCompletionStatus(hPort, + dwBytesAvailable, (ULONG_PTR)DISPATCH_PORT_PIPE_HANDLE_READ, + (LPOVERLAPPED)dmn); + if (!bSuccess) { + DISPATCH_INTERNAL_CRASH(GetLastError(), + "PostQueuedCompletionStatus"); + } + + // If data is written into the pipe and not read right away, ReadFile() + // will keep returning immediately and we'll flood the completion port. + // This event lets us synchronize with _dispatch_event_loop_drain() so + // that we only post events when it's ready for them. + WaitForSingleObject(dmn->dmn_event, INFINITE); + } while (!os_atomic_load(&dmn->dmn_stop, relaxed)); + _dispatch_debug("pipe[0x%llx]: monitor exiting", (long long)hPipe); + return 0; +} + +static DWORD +_dispatch_pipe_write_availability(HANDLE hPipe) +{ + IO_STATUS_BLOCK iosb; + FILE_PIPE_LOCAL_INFORMATION fpli; + NTSTATUS status = _dispatch_NtQueryInformationFile(hPipe, &iosb, &fpli, + sizeof(fpli), FilePipeLocalInformation); + if (!NT_SUCCESS(status)) { + return 1; + } + return fpli.WriteQuotaAvailable; +} + static BOOL _dispatch_io_trigger(dispatch_muxnote_t dmn) { @@ -150,9 +317,56 @@ _dispatch_io_trigger(dispatch_muxnote_t dmn) "PostQueuedCompletionStatus"); } break; + + case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE: + if ((dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) && + !dmn->dmn_thread) { + HANDLE hThread = (HANDLE)_beginthreadex(/* security */ NULL, + /* stack_size */ 1, _dispatch_pipe_monitor_thread, + (void *)dmn, /* initflag */ 0, /* thrdaddr */ NULL); + if (!hThread) { + DISPATCH_INTERNAL_CRASH(errno, "_beginthread"); + } + HANDLE hEvent = CreateEventW(NULL, /* bManualReset */ FALSE, + /* bInitialState */ FALSE, NULL); + if (!hEvent) { + DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateEventW"); + } + dmn->dmn_thread = hThread; + dmn->dmn_event = hEvent; + } + if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) { + _dispatch_muxnote_retain(dmn); + DWORD available = + _dispatch_pipe_write_availability((HANDLE)dmn->dmn_ident); + bSuccess = PostQueuedCompletionStatus(hPort, available, + (ULONG_PTR)DISPATCH_PORT_PIPE_HANDLE_WRITE, + (LPOVERLAPPED)dmn); + if (bSuccess == FALSE) { + DISPATCH_INTERNAL_CRASH(GetLastError(), + "PostQueuedCompletionStatus"); + } + } + break; } - return bSuccess; + return TRUE; +} + +DISPATCH_ALWAYS_INLINE +static inline enum _dispatch_muxnote_events +_dispatch_unote_required_events(dispatch_unote_t du) +{ + switch (du._du->du_filter) { + case DISPATCH_EVFILT_CUSTOM_ADD: + case DISPATCH_EVFILT_CUSTOM_OR: + case DISPATCH_EVFILT_CUSTOM_REPLACE: + return 0; + case EVFILT_WRITE: + return DISPATCH_MUXNOTE_EVENT_WRITE; + default: + return DISPATCH_MUXNOTE_EVENT_READ; + } } bool @@ -160,37 +374,52 @@ _dispatch_unote_register_muxed(dispatch_unote_t du) { struct dispatch_muxnote_bucket_s *dmb; dispatch_muxnote_t dmn; + enum _dispatch_muxnote_events events; + + events = _dispatch_unote_required_events(du); dmb = _dispatch_unote_muxnote_bucket(du._du->du_ident); dmn = _dispatch_unote_muxnote_find(dmb, du._du->du_ident, du._du->du_filter); if (dmn) { WIN_PORT_ERROR(); + DISPATCH_INTERNAL_CRASH(0, "muxnote updating is not supported"); } else { - dmn = _dispatch_muxnote_create(du); - if (dmn) { - if (_dispatch_io_trigger(dmn) == FALSE) { - _dispatch_muxnote_dispose(dmn); - dmn = NULL; - } else { - LIST_INSERT_HEAD(dmb, dmn, dmn_list); - } + dmn = _dispatch_muxnote_create(du, events); + if (!dmn) { + return false; } + if (_dispatch_io_trigger(dmn) == FALSE) { + _dispatch_muxnote_release(dmn); + return false; + } + LIST_INSERT_HEAD(dmb, dmn, dmn_list); } - if (dmn) { - dispatch_unote_linkage_t dul = _dispatch_unote_get_linkage(du); + dispatch_unote_linkage_t dul = _dispatch_unote_get_linkage(du); + switch (dmn->dmn_handle_type) { + case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID: + DISPATCH_INTERNAL_CRASH(0, "invalid handle"); + case DISPATCH_MUXNOTE_HANDLE_TYPE_FILE: AcquireSRWLockExclusive(&_dispatch_file_handles_lock); LIST_INSERT_HEAD(&_dispatch_file_handles, dul, du_link); ReleaseSRWLockExclusive(&_dispatch_file_handles_lock); + break; - dul->du_muxnote = dmn; - _dispatch_unote_state_set(du, DISPATCH_WLH_ANON, - DU_STATE_ARMED); + case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE: + if (events & DISPATCH_MUXNOTE_EVENT_READ) { + LIST_INSERT_HEAD(&dmn->dmn_readers_head, dul, du_link); + } else if (events & DISPATCH_MUXNOTE_EVENT_WRITE) { + LIST_INSERT_HEAD(&dmn->dmn_writers_head, dul, du_link); + } + break; } - return dmn != NULL; + dul->du_muxnote = dmn; + _dispatch_unote_state_set(du, DISPATCH_WLH_ANON, DU_STATE_ARMED); + + return true; } void @@ -208,21 +437,34 @@ _dispatch_unote_unregister_muxed(dispatch_unote_t du) dispatch_unote_linkage_t dul = _dispatch_unote_get_linkage(du); dispatch_muxnote_t dmn = dul->du_muxnote; - AcquireSRWLockExclusive(&_dispatch_file_handles_lock); - LIST_REMOVE(dul, du_link); - _LIST_TRASH_ENTRY(dul, du_link); - ReleaseSRWLockExclusive(&_dispatch_file_handles_lock); + switch (dmn->dmn_handle_type) { + case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID: + DISPATCH_INTERNAL_CRASH(0, "invalid handle"); + + case DISPATCH_MUXNOTE_HANDLE_TYPE_FILE: + AcquireSRWLockExclusive(&_dispatch_file_handles_lock); + LIST_REMOVE(dul, du_link); + _LIST_TRASH_ENTRY(dul, du_link); + ReleaseSRWLockExclusive(&_dispatch_file_handles_lock); + break; + + case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE: + LIST_REMOVE(dul, du_link); + _LIST_TRASH_ENTRY(dul, du_link); + break; + } dul->du_muxnote = NULL; LIST_REMOVE(dmn, dmn_list); - _dispatch_muxnote_dispose(dmn); + _dispatch_muxnote_stop(dmn); + _dispatch_muxnote_release(dmn); _dispatch_unote_state_set(du, DU_STATE_UNREGISTERED); return true; } static void -_dispatch_event_merge_file_handle() +_dispatch_event_merge_file_handle(void) { dispatch_unote_linkage_t dul, dul_next; @@ -240,6 +482,56 @@ _dispatch_event_merge_file_handle() ReleaseSRWLockExclusive(&_dispatch_file_handles_lock); } +static void +_dispatch_event_merge_pipe_handle_read(dispatch_muxnote_t dmn, + DWORD dwBytesAvailable) +{ + dispatch_unote_linkage_t dul, dul_next; + LIST_FOREACH_SAFE(dul, &dmn->dmn_readers_head, du_link, dul_next) { + dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul); + // consumed by dux_merge_evt() + _dispatch_retain_unote_owner(du); + dispatch_unote_state_t du_state = _dispatch_unote_state(du); + du_state &= ~DU_STATE_ARMED; + uintptr_t data = dwBytesAvailable; + uint32_t flags; + if (dwBytesAvailable > 0) { + flags = EV_ADD | EV_ENABLE | EV_DISPATCH; + } else { + du_state |= DU_STATE_NEEDS_DELETE; + flags = EV_DELETE | EV_DISPATCH; + } + _dispatch_unote_state_set(du, du_state); + os_atomic_store2o(du._dr, ds_pending_data, ~data, relaxed); + dux_merge_evt(du._du, flags, data, 0); + } + SetEvent(dmn->dmn_event); + // Retained when posting the completion packet + _dispatch_muxnote_release(dmn); +} + +static void +_dispatch_event_merge_pipe_handle_write(dispatch_muxnote_t dmn, + DWORD dwBytesAvailable) +{ + dispatch_unote_linkage_t dul, dul_next; + LIST_FOREACH_SAFE(dul, &dmn->dmn_writers_head, du_link, dul_next) { + dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul); + // consumed by dux_merge_evt() + _dispatch_retain_unote_owner(du); + _dispatch_unote_state_clear_bit(du, DU_STATE_ARMED); + uintptr_t data = dwBytesAvailable; + if (dwBytesAvailable > 0) { + os_atomic_store2o(du._dr, ds_pending_data, ~data, relaxed); + } else { + os_atomic_store2o(du._dr, ds_pending_data, 0, relaxed); + } + dux_merge_evt(du._du, EV_ADD | EV_ENABLE | EV_DISPATCH, data, 0); + } + // Retained when posting the completion packet + _dispatch_muxnote_release(dmn); +} + #pragma mark timers typedef struct _dispatch_windows_timeout_s { @@ -414,6 +706,16 @@ _dispatch_event_loop_drain(uint32_t flags) _dispatch_event_merge_file_handle(); break; + case DISPATCH_PORT_PIPE_HANDLE_READ: + _dispatch_event_merge_pipe_handle_read((dispatch_muxnote_t)pOV, + dwNumberOfBytesTransferred); + break; + + case DISPATCH_PORT_PIPE_HANDLE_WRITE: + _dispatch_event_merge_pipe_handle_write((dispatch_muxnote_t)pOV, + dwNumberOfBytesTransferred); + break; + default: DISPATCH_INTERNAL_CRASH(ulCompletionKey, "unsupported completion key"); diff --git a/src/io.c b/src/io.c index 6337c7877..839367767 100644 --- a/src/io.c +++ b/src/io.c @@ -2410,7 +2410,43 @@ _dispatch_operation_perform(dispatch_operation_t op) if (op->direction == DOP_DIR_READ) { if (op->params.type == DISPATCH_IO_STREAM) { #if defined(_WIN32) - ReadFile((HANDLE)op->fd_entry->fd, buf, (DWORD)len, (LPDWORD)&processed, NULL); + HANDLE hFile = (HANDLE)op->fd_entry->fd; + BOOL bSuccess; + if (GetFileType(hFile) == FILE_TYPE_PIPE) { + OVERLAPPED ovlOverlapped = {}; + DWORD dwTotalBytesAvail; + bSuccess = PeekNamedPipe(hFile, NULL, 0, NULL, + &dwTotalBytesAvail, NULL); + if (bSuccess) { + if (dwTotalBytesAvail == 0) { + err = EAGAIN; + goto error; + } + len = MIN(len, dwTotalBytesAvail); + bSuccess = ReadFile(hFile, buf, (DWORD)len, + (LPDWORD)&processed, &ovlOverlapped); + } + if (!bSuccess) { + DWORD dwError = GetLastError(); + if (dwError == ERROR_IO_PENDING) { + bSuccess = GetOverlappedResult(hFile, &ovlOverlapped, + (LPDWORD)&processed, /* bWait */ TRUE); + dwError = GetLastError(); + } + if (dwError == ERROR_BROKEN_PIPE || + dwError == ERROR_NO_DATA) { + bSuccess = TRUE; + processed = 0; + } + } + } else { + bSuccess = ReadFile(hFile, buf, (DWORD)len, + (LPDWORD)&processed, NULL); + } + if (!bSuccess) { + err = EIO; + goto error; + } #else processed = read(op->fd_entry->fd, buf, len); #endif @@ -2419,7 +2455,8 @@ _dispatch_operation_perform(dispatch_operation_t op) OVERLAPPED ovlOverlapped = {}; ovlOverlapped.Offset = off & 0xffffffff; ovlOverlapped.OffsetHigh = (off >> 32) & 0xffffffff; - ReadFile((HANDLE)op->fd_entry->fd, buf, (DWORD)len, (LPDWORD)&processed, &ovlOverlapped); + ReadFile((HANDLE)op->fd_entry->fd, buf, (DWORD)len, + (LPDWORD)&processed, &ovlOverlapped); #else processed = pread(op->fd_entry->fd, buf, len, off); #endif @@ -2427,7 +2464,51 @@ _dispatch_operation_perform(dispatch_operation_t op) } else if (op->direction == DOP_DIR_WRITE) { if (op->params.type == DISPATCH_IO_STREAM) { #if defined(_WIN32) - WriteFile((HANDLE)op->fd_entry->fd, buf, (DWORD)len, (LPDWORD)&processed, NULL); + HANDLE hFile = (HANDLE)op->fd_entry->fd; + BOOL bSuccess; + if (GetFileType(hFile) == FILE_TYPE_PIPE) { + // Unfortunately there isn't a good way to achieve O_NONBLOCK + // semantics when writing to a pipe. SetNamedPipeHandleState() + // can allow pipes to be switched into a "no wait" mode, but + // that doesn't work on most pipe handles because Windows + // doesn't consistently create pipes with FILE_WRITE_ATTRIBUTES + // access. The best we can do is to try to query the write quota + // and then write as much as we can. + IO_STATUS_BLOCK iosb; + FILE_PIPE_LOCAL_INFORMATION fpli; + NTSTATUS status = _dispatch_NtQueryInformationFile(hFile, &iosb, + &fpli, sizeof(fpli), FilePipeLocalInformation); + if (NT_SUCCESS(status)) { + if (fpli.WriteQuotaAvailable == 0) { + err = EAGAIN; + goto error; + } + len = MIN(len, fpli.WriteQuotaAvailable); + } + OVERLAPPED ovlOverlapped = {}; + bSuccess = WriteFile(hFile, buf, (DWORD)len, + (LPDWORD)&processed, &ovlOverlapped); + if (!bSuccess) { + DWORD dwError = GetLastError(); + if (dwError == ERROR_IO_PENDING) { + bSuccess = GetOverlappedResult(hFile, &ovlOverlapped, + (LPDWORD)&processed, /* bWait */ TRUE); + dwError = GetLastError(); + } + if (dwError == ERROR_BROKEN_PIPE || + dwError == ERROR_NO_DATA) { + bSuccess = TRUE; + processed = 0; + } + } + } else { + bSuccess = WriteFile(hFile, buf, (DWORD)len, + (LPDWORD)&processed, NULL); + } + if (!bSuccess) { + err = EIO; + goto error; + } #else processed = write(op->fd_entry->fd, buf, len); #endif @@ -2436,7 +2517,8 @@ _dispatch_operation_perform(dispatch_operation_t op) OVERLAPPED ovlOverlapped = {}; ovlOverlapped.Offset = off & 0xffffffff; ovlOverlapped.OffsetHigh = (off >> 32) & 0xffffffff; - WriteFile((HANDLE)op->fd_entry->fd, buf, (DWORD)len, (LPDWORD)&processed, &ovlOverlapped); + WriteFile((HANDLE)op->fd_entry->fd, buf, (DWORD)len, + (LPDWORD)&processed, &ovlOverlapped); #else processed = pwrite(op->fd_entry->fd, buf, len, off); #endif diff --git a/src/shims/generic_win_stubs.c b/src/shims/generic_win_stubs.c index c48eef66a..b976075af 100644 --- a/src/shims/generic_win_stubs.c +++ b/src/shims/generic_win_stubs.c @@ -6,6 +6,13 @@ DISPATCH_STATIC_GLOBAL(dispatch_once_t _dispatch_precise_time_pred); DISPATCH_STATIC_GLOBAL(_precise_time_fn_t _dispatch_QueryInterruptTimePrecise_ptr); DISPATCH_STATIC_GLOBAL(_precise_time_fn_t _dispatch_QueryUnbiasedInterruptTimePrecise_ptr); +typedef NTSTATUS (NTAPI *_NtQueryInformationFile_fn_t)(HANDLE FileHandle, + PIO_STATUS_BLOCK IoStatusBlock, PVOID FileInformation, ULONG Length, + FILE_INFORMATION_CLASS FileInformationClass); + +DISPATCH_STATIC_GLOBAL(dispatch_once_t _dispatch_ntdll_pred); +DISPATCH_STATIC_GLOBAL(_NtQueryInformationFile_fn_t _dispatch_NtQueryInformationFile_ptr); + static void _dispatch_init_precise_time(void *context DISPATCH_UNUSED) { @@ -38,3 +45,27 @@ _dispatch_QueryUnbiasedInterruptTimePrecise(PULONGLONG lpUnbiasedInterruptTimePr dispatch_once_f(&_dispatch_precise_time_pred, NULL, _dispatch_init_precise_time); return _dispatch_QueryUnbiasedInterruptTimePrecise_ptr(lpUnbiasedInterruptTimePrecise); } + +static void +_dispatch_init_ntdll(void *context DISPATCH_UNUSED) +{ + HMODULE ntdll = LoadLibraryW(L"ntdll.dll"); + if (!ntdll) { + // ntdll is not required. + return; + } + _dispatch_NtQueryInformationFile_ptr = (_NtQueryInformationFile_fn_t) + GetProcAddress(ntdll, "NtQueryInformationFile"); +} + +NTSTATUS _dispatch_NtQueryInformationFile(HANDLE FileHandle, + PIO_STATUS_BLOCK IoStatusBlock, PVOID FileInformation, ULONG Length, + FILE_INFORMATION_CLASS FileInformationClass) +{ + dispatch_once_f(&_dispatch_ntdll_pred, NULL, _dispatch_init_ntdll); + if (!_dispatch_NtQueryInformationFile_ptr) { + return STATUS_NOT_SUPPORTED; + } + return _dispatch_NtQueryInformationFile_ptr(FileHandle, IoStatusBlock, + FileInformation, Length, FileInformationClass); +} diff --git a/src/shims/generic_win_stubs.h b/src/shims/generic_win_stubs.h index 7d38adb29..1f7f4eaa3 100644 --- a/src/shims/generic_win_stubs.h +++ b/src/shims/generic_win_stubs.h @@ -6,7 +6,9 @@ #include #include +#include #include +#include #include #include @@ -40,7 +42,29 @@ typedef __typeof__(_Generic((__SIZE_TYPE__)0, \ /* * Wrappers for dynamically loaded Windows APIs */ + void _dispatch_QueryInterruptTimePrecise(PULONGLONG lpInterruptTimePrecise); void _dispatch_QueryUnbiasedInterruptTimePrecise(PULONGLONG lpUnbiasedInterruptTimePrecise); +enum { + FilePipeLocalInformation = 24, +}; + +typedef struct _FILE_PIPE_LOCAL_INFORMATION { + ULONG NamedPipeType; + ULONG NamedPipeConfiguration; + ULONG MaximumInstances; + ULONG CurrentInstances; + ULONG InboundQuota; + ULONG ReadDataAvailable; + ULONG OutboundQuota; + ULONG WriteQuotaAvailable; + ULONG NamedPipeState; + ULONG NamedPipeEnd; +} FILE_PIPE_LOCAL_INFORMATION, *PFILE_PIPE_LOCAL_INFORMATION; + +NTSTATUS _dispatch_NtQueryInformationFile(HANDLE FileHandle, + PIO_STATUS_BLOCK IoStatusBlock, PVOID FileInformation, ULONG Length, + FILE_INFORMATION_CLASS FileInformationClass); + #endif diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c6aa30449..6cc82179a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -128,6 +128,7 @@ set(DISPATCH_C_TESTS timer_set_time data io_net + io_pipe io_pipe_close select) @@ -171,6 +172,7 @@ foreach(test ${DISPATCH_C_TESTS}) dispatch_${test}.c) endforeach() +set_tests_properties(dispatch_io_pipe PROPERTIES TIMEOUT 15) set_tests_properties(dispatch_io_pipe_close PROPERTIES TIMEOUT 5) # test dispatch API for various C/CXX language variants diff --git a/tests/dispatch_io_pipe.c b/tests/dispatch_io_pipe.c new file mode 100644 index 000000000..f94438483 --- /dev/null +++ b/tests/dispatch_io_pipe.c @@ -0,0 +1,488 @@ +/* + * Copyright (c) 2019 Apple Inc. All rights reserved. + * + * @APPLE_APACHE_LICENSE_HEADER_START@ + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @APPLE_APACHE_LICENSE_HEADER_END@ + */ + +#include +#include +#include +#include +#include +#include +#if defined(__unix__) || (defined(__APPLE__) && defined(__MACH__)) +#include +#endif + +#include + +#include +#include "dispatch_test.h" + +enum { + DISPATCH_PIPE_KIND_ANONYMOUS, +#if defined(_WIN32) + DISPATCH_PIPE_KIND_NAMED_INBOUND, + DISPATCH_PIPE_KIND_NAMED_OUTBOUND, + DISPATCH_PIPE_KIND_NAMED_INBOUND_OVERLAPPED, + DISPATCH_PIPE_KIND_NAMED_OUTBOUND_OVERLAPPED, +#endif + DISPATCH_PIPE_KIND_COUNT, +}; + +enum { + DISPATCH_TEST_IMMEDIATE, + DISPATCH_TEST_DELAYED, +}; + +static const char *const pipe_names[] = { + [DISPATCH_PIPE_KIND_ANONYMOUS] = "anonymous", +#if defined(_WIN32) + [DISPATCH_PIPE_KIND_NAMED_INBOUND] = "named, inbound", + [DISPATCH_PIPE_KIND_NAMED_OUTBOUND] = "named, outbound", + [DISPATCH_PIPE_KIND_NAMED_INBOUND_OVERLAPPED] = "named, inbound, overlapped", + [DISPATCH_PIPE_KIND_NAMED_OUTBOUND_OVERLAPPED] = "named, outbound, overlapped", +#endif +}; + +static const char *const delay_names[] = { + [DISPATCH_TEST_IMMEDIATE] = "Immediate", + [DISPATCH_TEST_DELAYED] = "Delayed", +}; + +#if defined(_WIN32) +enum { + NAMED_PIPE_BUFFER_SIZE = 0x1000, +}; +#endif + +static size_t +test_get_pipe_buffer_size(int kind) +{ +#if defined(_WIN32) + if (kind != DISPATCH_PIPE_KIND_ANONYMOUS) { + return NAMED_PIPE_BUFFER_SIZE; + } + static dispatch_once_t once; + static DWORD size; + dispatch_once(&once, ^{ + HANDLE read_handle, write_handle; + if (!CreatePipe(&read_handle, &write_handle, NULL, 0)) { + test_long("CreatePipe", GetLastError(), ERROR_SUCCESS); + test_stop(); + } + GetNamedPipeInfo(write_handle, NULL, &size, NULL, NULL); + CloseHandle(read_handle); + CloseHandle(write_handle); + }); + return size; +#else + (void)kind; + static dispatch_once_t once; + static size_t size; + dispatch_once(&once, ^{ + int fds[2]; + if (pipe(fds) < 0) { + test_errno("pipe", errno, 0); + test_stop(); + } + fcntl(fds[1], F_SETFL, O_NONBLOCK); + for (size = 0; write(fds[1], "", 1) > 0; size++) {} + close(fds[0]); + close(fds[1]); + }); + return size; +#endif +} + +#if defined(_WIN32) +static void +test_make_named_pipe(DWORD flags, dispatch_fd_t *readfd, dispatch_fd_t *writefd) +{ + wchar_t name[64]; + static int counter = 0; + swprintf(name, sizeof(name), L"\\\\.\\pipe\\dispatch_io_pipe_%lu_%d", + GetCurrentProcessId(), counter++); + HANDLE server = CreateNamedPipeW(name, + flags | FILE_FLAG_FIRST_PIPE_INSTANCE, PIPE_TYPE_BYTE, + /* nMaxInstances */ 1, NAMED_PIPE_BUFFER_SIZE, + NAMED_PIPE_BUFFER_SIZE, /* nDefaultTimeOut */ 0, + /* lpSecurityAttributes */ NULL); + if (server == INVALID_HANDLE_VALUE) { + test_ptr_not("CreateNamedPipe", server, INVALID_HANDLE_VALUE); + test_stop(); + } + HANDLE client = CreateFileW(name, + (flags & PIPE_ACCESS_INBOUND) ? GENERIC_WRITE : GENERIC_READ, + /* dwShareMode */ 0, /* lpSecurityAttributes */ NULL, OPEN_EXISTING, + flags & FILE_FLAG_OVERLAPPED, /* hTemplateFile */ NULL); + if (client == INVALID_HANDLE_VALUE) { + test_ptr_not("CreateFile", client, INVALID_HANDLE_VALUE); + test_stop(); + } + if (flags & PIPE_ACCESS_INBOUND) { + *readfd = (dispatch_fd_t)server; + *writefd = (dispatch_fd_t)client; + } else { + *readfd = (dispatch_fd_t)client; + *writefd = (dispatch_fd_t)server; + } +} +#endif + +static void +test_make_pipe(int kind, dispatch_fd_t *readfd, dispatch_fd_t *writefd) +{ +#if defined(_WIN32) + switch (kind) { + case DISPATCH_PIPE_KIND_ANONYMOUS: + if (!CreatePipe((PHANDLE)readfd, (PHANDLE)writefd, NULL, 0)) { + test_long("CreatePipe", GetLastError(), ERROR_SUCCESS); + test_stop(); + } + break; + case DISPATCH_PIPE_KIND_NAMED_INBOUND: + test_make_named_pipe(PIPE_ACCESS_INBOUND, readfd, writefd); + break; + case DISPATCH_PIPE_KIND_NAMED_OUTBOUND: + test_make_named_pipe(PIPE_ACCESS_OUTBOUND, readfd, writefd); + break; + case DISPATCH_PIPE_KIND_NAMED_INBOUND_OVERLAPPED: + test_make_named_pipe(PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, readfd, + writefd); + break; + case DISPATCH_PIPE_KIND_NAMED_OUTBOUND_OVERLAPPED: + test_make_named_pipe(PIPE_ACCESS_OUTBOUND | FILE_FLAG_OVERLAPPED, + readfd, writefd); + break; + } +#else + (void)kind; + int fds[2]; + if (pipe(fds) < 0) { + test_errno("pipe", errno, 0); + test_stop(); + } + *readfd = fds[0]; + *writefd = fds[1]; +#endif +} + +static void +test_source_read(int kind, int delay) +{ + printf("\nSource Read %s: %s\n", delay_names[delay], pipe_names[kind]); + + dispatch_fd_t readfd, writefd; + test_make_pipe(kind, &readfd, &writefd); + + dispatch_group_t g = dispatch_group_create(); + dispatch_group_enter(g); + + void (^write_block)(void) = ^{ + dispatch_group_enter(g); + char buf[512] = {0}; + ssize_t n = dispatch_test_fd_write(writefd, buf, sizeof(buf)); + if (n < 0) { + test_errno("write error", errno, 0); + test_stop(); + } + test_sizet("num written", (size_t)n, sizeof(buf)); + dispatch_group_leave(g); + }; + if (delay == DISPATCH_TEST_IMMEDIATE) { + write_block(); + } + + dispatch_source_t reader = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, + (uintptr_t)readfd, 0, dispatch_get_global_queue(0, 0)); + test_ptr_notnull("dispatch_source_create", reader); + assert(reader); + dispatch_source_set_event_handler(reader, ^{ + dispatch_group_enter(g); + char buf[512]; + size_t available = dispatch_source_get_data(reader); + test_sizet("num available", available, sizeof(buf)); + ssize_t n = dispatch_test_fd_read(readfd, buf, sizeof(buf)); + if (n >= 0) { + test_sizet("num read", (size_t)n, sizeof(buf)); + } else { + test_errno("read error", errno, 0); + } + dispatch_source_cancel(reader); + dispatch_group_leave(g); + }); + dispatch_source_set_cancel_handler(reader, ^{ + dispatch_release(reader); + dispatch_group_leave(g); + }); + dispatch_resume(reader); + + dispatch_source_t t = NULL; + if (delay == DISPATCH_TEST_DELAYED) { + t = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, + dispatch_get_global_queue(0, 0)); + dispatch_source_set_event_handler(t, write_block); + dispatch_source_set_timer(t, + dispatch_time(DISPATCH_TIME_NOW, 500 * NSEC_PER_MSEC), + DISPATCH_TIME_FOREVER, 0); + dispatch_resume(t); + } + + test_group_wait(g); + dispatch_release(g); + if (t) { + dispatch_source_cancel(t); + dispatch_release(t); + } + dispatch_test_fd_close(readfd); + dispatch_test_fd_close(writefd); +} + +static void +test_source_write(int kind, int delay) +{ + printf("\nSource Write %s: %s\n", delay_names[delay], pipe_names[kind]); + + dispatch_fd_t readfd, writefd; + test_make_pipe(kind, &readfd, &writefd); + + dispatch_group_t g = dispatch_group_create(); + dispatch_group_enter(g); + + const size_t bufsize = test_get_pipe_buffer_size(kind); + + void (^write_block)(void) = ^{ + char *buf = calloc(bufsize, 1); + assert(buf); + ssize_t nw = dispatch_test_fd_write(writefd, buf, bufsize); + free(buf); + if (nw < 0) { + test_errno("write error", errno, 0); + test_stop(); + } + test_sizet("num written", (size_t)nw, bufsize); + }; + write_block(); + + void (^read_block)(void) = ^{ + dispatch_group_enter(g); + char *buf = calloc(bufsize, 1); + assert(buf); + ssize_t nr = dispatch_test_fd_read(readfd, buf, bufsize); + free(buf); + if (nr < 0) { + test_errno("read error", errno, 0); + test_stop(); + } + test_sizet("num read", (size_t)nr, bufsize); + dispatch_group_leave(g); + }; + if (delay == DISPATCH_TEST_IMMEDIATE) { + read_block(); + } + + dispatch_source_t writer = dispatch_source_create( + DISPATCH_SOURCE_TYPE_WRITE, (uintptr_t)writefd, 0, + dispatch_get_global_queue(0, 0)); + test_ptr_notnull("dispatch_source_create", writer); + assert(writer); + dispatch_source_set_event_handler(writer, ^{ + dispatch_group_enter(g); + size_t available = dispatch_source_get_data(writer); + test_sizet_less_than("num available", 0, available); + write_block(); + read_block(); + dispatch_source_cancel(writer); + dispatch_group_leave(g); + }); + dispatch_source_set_cancel_handler(writer, ^{ + dispatch_release(writer); + dispatch_group_leave(g); + }); + dispatch_resume(writer); + + dispatch_source_t t = NULL; + if (delay == DISPATCH_TEST_DELAYED) { + t = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, + dispatch_get_global_queue(0, 0)); + dispatch_source_set_event_handler(t, read_block); + dispatch_source_set_timer(t, + dispatch_time(DISPATCH_TIME_NOW, 500 * NSEC_PER_MSEC), + DISPATCH_TIME_FOREVER, 0); + dispatch_resume(t); + } + + test_group_wait(g); + dispatch_release(g); + if (t) { + dispatch_source_cancel(t); + dispatch_release(t); + } + dispatch_test_fd_close(readfd); + dispatch_test_fd_close(writefd); +} + +static void +test_dispatch_read(int kind, int delay) +{ + printf("\nDispatch Read %s: %s\n", delay_names[delay], pipe_names[kind]); + + dispatch_fd_t readfd, writefd; + test_make_pipe(kind, &readfd, &writefd); + + dispatch_group_t g = dispatch_group_create(); + dispatch_group_enter(g); + + char writebuf[512] = {0}; + char *writebufp = writebuf; + void (^write_block)(void) = ^{ + dispatch_group_enter(g); + ssize_t n = + dispatch_test_fd_write(writefd, writebufp, sizeof(writebuf)); + if (n < 0) { + test_errno("write error", errno, 0); + test_stop(); + } + test_sizet("num written", (size_t)n, sizeof(writebuf)); + dispatch_group_leave(g); + }; + if (delay == DISPATCH_TEST_IMMEDIATE) { + write_block(); + } + + dispatch_read(readfd, sizeof(writebuf), dispatch_get_global_queue(0, 0), + ^(dispatch_data_t data, int err) { + test_errno("read error", err, 0); + test_sizet("num read", dispatch_data_get_size(data), sizeof(writebuf)); + dispatch_group_leave(g); + }); + + dispatch_source_t t = NULL; + if (delay == DISPATCH_TEST_DELAYED) { + t = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, + dispatch_get_global_queue(0, 0)); + dispatch_source_set_event_handler(t, write_block); + dispatch_source_set_timer(t, + dispatch_time(DISPATCH_TIME_NOW, 500 * NSEC_PER_MSEC), + DISPATCH_TIME_FOREVER, 0); + dispatch_resume(t); + } + + test_group_wait(g); + dispatch_release(g); + if (t) { + dispatch_source_cancel(t); + dispatch_release(t); + } + dispatch_test_fd_close(readfd); + dispatch_test_fd_close(writefd); +} + +static void +test_dispatch_write(int kind, int delay) +{ + printf("\nDispatch Write %s: %s\n", delay_names[delay], pipe_names[kind]); + + dispatch_fd_t readfd, writefd; + test_make_pipe(kind, &readfd, &writefd); + + dispatch_group_t g = dispatch_group_create(); + dispatch_group_enter(g); + + const size_t bufsize = test_get_pipe_buffer_size(kind); + + char *buf = calloc(bufsize, 1); + assert(buf); + ssize_t nw = dispatch_test_fd_write(writefd, buf, bufsize); + free(buf); + if (nw < 0) { + test_errno("write error", errno, 0); + test_stop(); + } + test_sizet("num written", (size_t)nw, bufsize); + + void (^read_block)(void) = ^{ + dispatch_group_enter(g); + char *readbuf = calloc(bufsize, 1); + assert(readbuf); + ssize_t nr = dispatch_test_fd_read(readfd, readbuf, bufsize); + free(readbuf); + if (nr < 0) { + test_errno("read error", errno, 0); + test_stop(); + } + test_sizet("num read", (size_t)nr, bufsize); + dispatch_group_leave(g); + }; + if (delay == DISPATCH_TEST_IMMEDIATE) { + read_block(); + } + + buf = calloc(bufsize, 1); + assert(buf); + dispatch_data_t wd = dispatch_data_create(buf, bufsize, + dispatch_get_global_queue(0, 0), DISPATCH_DATA_DESTRUCTOR_FREE); + dispatch_write(writefd, wd, dispatch_get_global_queue(0, 0), + ^(dispatch_data_t data, int err) { + test_errno("write error", err, 0); + test_ptr_null("data written", data); + read_block(); + dispatch_group_leave(g); + }); + + dispatch_source_t t = NULL; + if (delay == DISPATCH_TEST_DELAYED) { + t = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, + dispatch_get_global_queue(0, 0)); + dispatch_source_set_event_handler(t, read_block); + dispatch_source_set_timer(t, + dispatch_time(DISPATCH_TIME_NOW, 500 * NSEC_PER_MSEC), + DISPATCH_TIME_FOREVER, 0); + dispatch_resume(t); + } + + test_group_wait(g); + dispatch_release(g); + dispatch_release(wd); + if (t) { + dispatch_source_cancel(t); + dispatch_release(t); + } + dispatch_test_fd_close(readfd); + dispatch_test_fd_close(writefd); +} + +int +main(void) +{ + dispatch_test_start("Dispatch IO Pipe"); + dispatch_async(dispatch_get_main_queue(), ^{ + for (int kind = 0; kind < DISPATCH_PIPE_KIND_COUNT; kind++) { + test_source_read(kind, DISPATCH_TEST_IMMEDIATE); + test_source_read(kind, DISPATCH_TEST_DELAYED); + test_source_write(kind, DISPATCH_TEST_IMMEDIATE); + test_source_write(kind, DISPATCH_TEST_DELAYED); + test_dispatch_read(kind, DISPATCH_TEST_IMMEDIATE); + test_dispatch_read(kind, DISPATCH_TEST_DELAYED); + test_dispatch_write(kind, DISPATCH_TEST_IMMEDIATE); + test_dispatch_write(kind, DISPATCH_TEST_DELAYED); + } + test_stop(); + }); + dispatch_main(); +} diff --git a/tests/dispatch_test.c b/tests/dispatch_test.c index b0d028df3..d84a7b228 100644 --- a/tests/dispatch_test.c +++ b/tests/dispatch_test.c @@ -337,6 +337,21 @@ ssize_t dispatch_test_fd_read(dispatch_fd_t fd, void *buf, size_t count) { #if defined(_WIN32) + if (GetFileType((HANDLE)fd) == FILE_TYPE_PIPE) { + OVERLAPPED ov = {0}; + DWORD num_read; + BOOL success = ReadFile((HANDLE)fd, buf, count, &num_read, &ov); + if (!success && GetLastError() == ERROR_IO_PENDING) { + success = GetOverlappedResult((HANDLE)fd, &ov, &num_read, + /* bWait */ TRUE); + } + if (!success) { + print_winapi_error("ReadFile", GetLastError()); + errno = EIO; + return -1; + } + return (ssize_t)num_read; + } DWORD num_read; if (!ReadFile((HANDLE)fd, buf, count, &num_read, NULL)) { print_winapi_error("ReadFile", GetLastError()); @@ -353,6 +368,21 @@ ssize_t dispatch_test_fd_write(dispatch_fd_t fd, const void *buf, size_t count) { #if defined(_WIN32) + if (GetFileType((HANDLE)fd) == FILE_TYPE_PIPE) { + OVERLAPPED ov = {0}; + DWORD num_written; + BOOL success = WriteFile((HANDLE)fd, buf, count, &num_written, &ov); + if (!success && GetLastError() == ERROR_IO_PENDING) { + success = GetOverlappedResult((HANDLE)fd, &ov, &num_written, + /* bWait */ TRUE); + } + if (!success) { + print_winapi_error("WriteFile", GetLastError()); + errno = EIO; + return -1; + } + return (ssize_t)num_written; + } DWORD num_written; if (!WriteFile((HANDLE)fd, buf, count, &num_written, NULL)) { print_winapi_error("WriteFile", GetLastError());