Skip to content

Queue write only after processing all buffers #445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE-APACHE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright (c) 2015-present MagicStack Inc. http://magic.io
Copyright (C) 2016-present the uvloop authors and contributors.

Apache License
Version 2.0, January 2004
Expand Down
2 changes: 1 addition & 1 deletion LICENSE-MIT
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License

Copyright (c) 2015-present MagicStack Inc. http://magic.io
Copyright (C) 2016-present the uvloop authors and contributors.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
11 changes: 9 additions & 2 deletions uvloop/handles/stream.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ cdef class UVStream(UVBaseTransport):
cdef inline _init(self, Loop loop, object protocol, Server server,
object waiter, object context)

cdef inline _exec_write(self)

cdef inline _shutdown(self)
cdef inline _accept(self, UVStream server)
Expand All @@ -31,7 +30,15 @@ cdef class UVStream(UVBaseTransport):
cdef inline __reading_started(self)
cdef inline __reading_stopped(self)

cdef inline _write(self, object data)
# The user API firstly calls _buffer_write() to buffer up user data chunks,
# potentially multiple times in writelines(), and then call _start_write()
# to start writing either immediately or in the next iteration.
cdef inline _buffer_write(self, object data)
cdef inline _start_write(self)

# _exec_write() is the method that does the actual send, and _try_write()
# is a fast-path used in _exec_write() to send a single chunk.
cdef inline _exec_write(self)
cdef inline _try_write(self, object data)

cdef _close(self)
Expand Down
17 changes: 10 additions & 7 deletions uvloop/handles/stream.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ cdef class _StreamWriteContext:
PyObject_GetBuffer(
buf, &p_pybufs[py_bufs_len], PyBUF_SIMPLE)
except Exception:
# This shouldn't ever happen, as `UVStream._write`
# This shouldn't ever happen, as `UVStream._buffer_write`
# casts non-bytes objects to `memoryviews`.
ctx.py_bufs_len = py_bufs_len
ctx.free_bufs()
Expand Down Expand Up @@ -407,7 +407,7 @@ cdef class UVStream(UVBaseTransport):

return written

cdef inline _write(self, object data):
cdef inline _buffer_write(self, object data):
cdef int dlen

if not PyBytes_CheckExact(data):
Expand All @@ -420,6 +420,7 @@ cdef class UVStream(UVBaseTransport):
self._buffer_size += dlen
self._buffer.append(data)

cdef inline _start_write(self):
if (not self._protocol_paused and
(<uv.uv_stream_t*>self._handle).write_queue_size == 0 and
self._buffer_size > self._high_water):
Expand All @@ -443,10 +444,10 @@ cdef class UVStream(UVBaseTransport):
# If not all of the data was sent successfully,
# we might need to pause the protocol.
self._maybe_pause_protocol()
return

self._maybe_pause_protocol()
self._loop._queue_write(self)
elif self._buffer_size > 0:
self._maybe_pause_protocol()
self._loop._queue_write(self)

cdef inline _exec_write(self):
cdef:
Expand Down Expand Up @@ -679,7 +680,8 @@ cdef class UVStream(UVBaseTransport):
if self._conn_lost:
self._conn_lost += 1
return
self._write(buf)
self._buffer_write(buf)
self._start_write()

def writelines(self, bufs):
self._ensure_alive()
Expand All @@ -690,7 +692,8 @@ cdef class UVStream(UVBaseTransport):
self._conn_lost += 1
return
for buf in bufs:
self._write(buf)
self._buffer_write(buf)
self._start_write()

def write_eof(self):
self._ensure_alive()
Expand Down