Skip to content

Commit b2544fe

Browse files
committed
Clamp write to bufsize-1 bytes to afford sentinel value for no-progress
1 parent 1169502 commit b2544fe

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

src/io.c

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2518,13 +2518,40 @@ _dispatch_operation_perform(dispatch_operation_t op)
25182518
NTSTATUS status = _dispatch_NtQueryInformationFile(hFile,
25192519
&iosb, &fpli, sizeof(fpli), FilePipeLocalInformation);
25202520
if (NT_SUCCESS(status)) {
2521-
// WriteQuotaAvailable is unreliable in the presence
2522-
// of a blocking reader, when it can return zero, so only
2523-
// account for it otherwise
2524-
if (fpli.WriteQuotaAvailable > 0) {
2525-
len = MIN(len, fpli.WriteQuotaAvailable);
2521+
// WriteQuotaAvailable is the free space in the output buffer
2522+
// that has not already been reserved for reading. In other words,
2523+
// WriteQuotaAvailable =
2524+
// OutboundQuota - WriteQuotaUsed - QueuedReadSize.
2525+
// It is not documented that QueuedReadSize is part of this
2526+
// calculation, but this behavior has been observed experimentally.
2527+
// Unfortunately, this means that it is not possible to distinguish
2528+
// between a full output buffer and a reader blocked waiting for a
2529+
// full buffer's worth of data. This is a problem because if the
2530+
// output buffer is full and no reader is waiting for data, then
2531+
// attempting to write to the buffer of a PIPE_WAIT, non-
2532+
// overlapped I/O pipe will block the dispatch queue thread.
2533+
//
2534+
// In order to work around this idiosyncrasy, we bound the size of
2535+
// the write to be OutboundQuota - 1. This affords us a sentinel value
2536+
// in WriteQuotaAvailable that can be used to detect if a reader is
2537+
// making progress or not.
2538+
// WriteQuotaAvailable = 0 => a reader is blocked waiting for data.
2539+
// WriteQuotaAvailable = 1 => the pipe has been written to, but no
2540+
// reader is making progress.
2541+
// When we detect that WriteQuotaAvailable == 1, we write 0 bytes to
2542+
// avoid blocking the dispatch queue thread.
2543+
if (fpli.WriteQuotaAvailable == 0) {
2544+
// This condition can only occur when we have a reader blocked
2545+
// waiting for data on the pipe. In this case, write a full
2546+
// buffer's worth of data (less one byte to preserve this
2547+
// sentinel value of WriteQuotaAvailable == 0).
2548+
len = MIN(len, fpli.OutboundQuota - 1);
2549+
} else {
2550+
// Subtract 1 from WriteQuotaAvailable to ensure we do not fill
2551+
// the pipe and preserve the sentinel value of
2552+
// WriteQuotaAvailable == 1.
2553+
len = MIN(len, fpli.WriteQuotaAvailable - 1);
25262554
}
2527-
len = MIN(len, fpli.OutboundQuota);
25282555
}
25292556

25302557
OVERLAPPED ovlOverlapped = {};

tests/dispatch_io_pipe.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,12 @@ test_dispatch_write(int kind, int delay)
404404
dispatch_group_t g = dispatch_group_create();
405405
dispatch_group_enter(g);
406406

407-
const size_t bufsize = test_get_pipe_buffer_size(kind);
407+
// The libdispatch implementation writes at most bufsize-1 bytes
408+
// before requiring a reader to start making progress. Because
409+
// these tests operate serially, the reader will not make progress
410+
// until the write finishes, and a write of >= bufsize will not
411+
// finish until the reader starts draining the pipe.
412+
const size_t bufsize = test_get_pipe_buffer_size(kind) - 1;
408413

409414
char *buf = calloc(bufsize, 1);
410415
assert(buf);

0 commit comments

Comments
 (0)