Skip to content

Revert "add stream shutdown and support half-duplex operation" #41808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 6, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -45,7 +45,6 @@ Standard library changes
overflow in most cases. The new function `checked_length` is now available, which will try to use checked
arithmetic to error if the result may be wrapping. Or use a package such as SaferIntegers.jl when
constructing the range. ([#40382])
* TCP socket objects now expose `shutdown` functionality and support half-open mode usage ([#40783]).

#### InteractiveUtils
* A new macro `@time_imports` for reporting any time spent importing packages and their dependencies ([#41612])
1 change: 0 additions & 1 deletion base/coreio.jl
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ write(::DevNull, ::UInt8) = 1
unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n
close(::DevNull) = nothing
wait_close(::DevNull) = wait()
bytesavailable(io::DevNull) = 0

let CoreIO = Union{Core.CoreSTDOUT, Core.CoreSTDERR}
global write(io::CoreIO, x::UInt8) = Core.write(io, x)
1 change: 0 additions & 1 deletion base/exports.jl
Original file line number Diff line number Diff line change
@@ -803,7 +803,6 @@ export

# I/O and events
close,
shutdown,
countlines,
eachline,
readeach,
124 changes: 52 additions & 72 deletions base/io.jl
Original file line number Diff line number Diff line change
@@ -60,49 +60,9 @@ function isopen end
Close an I/O stream. Performs a [`flush`](@ref) first.
"""
function close end

"""
shutdown(stream)
Shutdown the write half of a full-duplex I/O stream. Performs a [`flush`](@ref)
first. Notify the other end that no more data will be written to the underlying
file. This is not supported by all IO types.
# Examples
```jldoctest
julia> io = Base.BufferStream(); # this never blocks, so we can read and write on the same Task
julia> write(io, "request");
julia> # calling `read(io)` here would block forever
julia> shutdown(io);
julia> read(io, String)
"request"
"""
function shutdown end

"""
flush(stream)
Commit all currently buffered writes to the given stream.
"""
function flush end

"""
bytesavailable(io)
Return the number of bytes available for reading before a read from this stream or buffer will block.
# Examples
```jldoctest
julia> io = IOBuffer("JuliaLang is a GitHub organization");
julia> bytesavailable(io)
34
```
"""
function wait_readnb end
function wait_close end
function bytesavailable end

"""
@@ -121,7 +81,7 @@ function readavailable end
"""
isreadable(io) -> Bool
Return `false` if the specified IO object is not readable.
Return `true` if the specified IO object is readable (if that can be determined).
# Examples
```jldoctest
@@ -139,12 +99,12 @@ true
julia> rm("myfile.txt")
```
"""
isreadable(io::IO) = isopen(io)
function isreadable end

"""
iswritable(io) -> Bool
Return `false` if the specified IO object is not writable.
Return `true` if the specified IO object is writable (if that can be determined).
# Examples
```jldoctest
@@ -162,22 +122,9 @@ false
julia> rm("myfile.txt")
```
"""
iswritable(io::IO) = isopen(io)

"""
eof(stream) -> Bool
Test whether an I/O stream is at end-of-file. If the stream is not yet exhausted, this
function will block to wait for more data if necessary, and then return `false`. Therefore
it is always safe to read one byte after seeing `eof` return `false`. `eof` will return
`false` as long as buffered data is still available, even if the remote end of a connection
is closed.
"""
function eof end

function iswritable end
function copy end
function wait_readnb end
function wait_close end
function eof end

"""
read(io::IO, T)
@@ -410,37 +357,65 @@ end
function pipe_reader end
function pipe_writer end

for f in (:flush, :shutdown, :iswritable)
@eval $(f)(io::AbstractPipe) = $(f)(pipe_writer(io)::IO)
end
write(io::AbstractPipe, byte::UInt8) = write(pipe_writer(io)::IO, byte)
write(to::IO, from::AbstractPipe) = write(to, pipe_reader(from))
unsafe_write(io::AbstractPipe, p::Ptr{UInt8}, nb::UInt) = unsafe_write(pipe_writer(io)::IO, p, nb)::Union{Int,UInt}
buffer_writes(io::AbstractPipe, args...) = buffer_writes(pipe_writer(io)::IO, args...)
flush(io::AbstractPipe) = flush(pipe_writer(io)::IO)

for f in (
# peek/mark interface
:mark, :unmark, :reset, :ismarked,
# Simple reader functions
:read, :readavailable, :bytesavailable, :reseteof, :isreadable)
@eval $(f)(io::AbstractPipe) = $(f)(pipe_reader(io)::IO)
end
read(io::AbstractPipe, byte::Type{UInt8}) = read(pipe_reader(io)::IO, byte)::UInt8
unsafe_read(io::AbstractPipe, p::Ptr{UInt8}, nb::UInt) = unsafe_read(pipe_reader(io)::IO, p, nb)
read(io::AbstractPipe) = read(pipe_reader(io)::IO)
readuntil(io::AbstractPipe, arg::UInt8; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil(io::AbstractPipe, arg::AbstractChar; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil(io::AbstractPipe, arg::AbstractString; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil(io::AbstractPipe, arg::AbstractVector; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil_vector!(io::AbstractPipe, target::AbstractVector, keep::Bool, out) = readuntil_vector!(pipe_reader(io)::IO, target, keep, out)
readbytes!(io::AbstractPipe, target::AbstractVector{UInt8}, n=length(target)) = readbytes!(pipe_reader(io)::IO, target, n)

for f in (
# peek/mark interface
:mark, :unmark, :reset, :ismarked,
# Simple reader functions
:readavailable, :isreadable)
@eval $(f)(io::AbstractPipe) = $(f)(pipe_reader(io)::IO)
end
peek(io::AbstractPipe, ::Type{T}) where {T} = peek(pipe_reader(io)::IO, T)::T
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io)::IO, nb)
eof(io::AbstractPipe) = eof(pipe_reader(io)::IO)::Bool

iswritable(io::AbstractPipe) = iswritable(pipe_writer(io)::IO)
isopen(io::AbstractPipe) = isopen(pipe_writer(io)::IO) || isopen(pipe_reader(io)::IO)
close(io::AbstractPipe) = (close(pipe_writer(io)::IO); close(pipe_reader(io)::IO))
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io)::IO, nb)
wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)::IO); wait_close(pipe_reader(io)::IO))

"""
bytesavailable(io)
Return the number of bytes available for reading before a read from this stream or buffer will block.
# Examples
```jldoctest
julia> io = IOBuffer("JuliaLang is a GitHub organization");
julia> bytesavailable(io)
34
```
"""
bytesavailable(io::AbstractPipe) = bytesavailable(pipe_reader(io)::IO)
bytesavailable(io::DevNull) = 0

"""
eof(stream) -> Bool
Test whether an I/O stream is at end-of-file. If the stream is not yet exhausted, this
function will block to wait for more data if necessary, and then return `false`. Therefore
it is always safe to read one byte after seeing `eof` return `false`. `eof` will return
`false` as long as buffered data is still available, even if the remote end of a connection
is closed.
"""
eof(io::AbstractPipe) = eof(pipe_reader(io)::IO)::Bool
reseteof(io::AbstractPipe) = reseteof(pipe_reader(io)::IO)


# Exception-safe wrappers (io = open(); try f(io) finally close(io))

@@ -1144,6 +1119,11 @@ ismarked(io::IO) = io.mark >= 0
# Make sure all IO streams support flush, even if only as a no-op,
# to make it easier to write generic I/O code.

"""
flush(stream)
Commit all currently buffered writes to the given stream.
"""
flush(io::IO) = nothing

"""
6 changes: 0 additions & 6 deletions base/iobuffer.jl
Original file line number Diff line number Diff line change
@@ -334,12 +334,6 @@ end

eof(io::GenericIOBuffer) = (io.ptr-1 == io.size)

function shutdown(io::GenericIOBuffer)
io.writable = false
# OR throw(_UVError("shutdown", UV_ENOTSOCK))
nothing
end

@noinline function close(io::GenericIOBuffer{T}) where T
io.readable = false
io.writable = false
1 change: 0 additions & 1 deletion base/libuv.jl
Original file line number Diff line number Diff line change
@@ -107,7 +107,6 @@ end
function uv_alloc_buf end
function uv_readcb end
function uv_writecb_task end
function uv_shutdowncb_task end
function uv_return_spawn end
function uv_asynccb end
function uv_timercb end
1 change: 0 additions & 1 deletion base/process.jl
Original file line number Diff line number Diff line change
@@ -275,7 +275,6 @@ function setup_stdio(stdio::Union{IOBuffer, BufferStream}, child_readable::Bool)
@warn "Process error" exception=(ex, catch_backtrace())
finally
close(parent)
child_readable || shutdown(stdio)
end
end
catch ex
132 changes: 34 additions & 98 deletions base/stream.jl
Original file line number Diff line number Diff line change
@@ -109,7 +109,7 @@ function eof(s::LibuvStream)
# and that we won't return true if there's a readerror pending (it'll instead get thrown).
# This requires some careful ordering here (TODO: atomic loads)
bytesavailable(s) > 0 && return false
open = isreadable(s) # must precede readerror check
open = isopen(s) # must precede readerror check
s.readerror === nothing || throw(s.readerror)
return !open
end
@@ -270,7 +270,6 @@ show(io::IO, stream::LibuvStream) = print(io, typeof(stream), "(",
function isreadable(io::LibuvStream)
bytesavailable(io) > 0 && return true
isopen(io) || return false
io.status == StatusEOF && return false
return ccall(:uv_is_readable, Cint, (Ptr{Cvoid},), io.handle) != 0
end

@@ -379,7 +378,7 @@ function isopen(x::Union{LibuvStream, LibuvServer})
if x.status == StatusUninit || x.status == StatusInit
throw(ArgumentError("$x is not initialized"))
end
return x.status != StatusClosed
return x.status != StatusClosed && x.status != StatusEOF
end

function check_open(x::Union{LibuvStream, LibuvServer})
@@ -391,13 +390,13 @@ end
function wait_readnb(x::LibuvStream, nb::Int)
# fast path before iolock acquire
bytesavailable(x.buffer) >= nb && return
open = isopen(x) && x.status != StatusEOF # must precede readerror check
open = isopen(x) # must precede readerror check
x.readerror === nothing || throw(x.readerror)
open || return
iolock_begin()
# repeat fast path after iolock acquire, before other expensive work
bytesavailable(x.buffer) >= nb && (iolock_end(); return)
open = isopen(x) && x.status != StatusEOF
open = isopen(x)
x.readerror === nothing || throw(x.readerror)
open || (iolock_end(); return)
# now do the "real" work
@@ -408,7 +407,6 @@ function wait_readnb(x::LibuvStream, nb::Int)
while bytesavailable(x.buffer) < nb
x.readerror === nothing || throw(x.readerror)
isopen(x) || break
x.status != StatusEOF || break
x.throttle = max(nb, x.throttle)
start_reading(x) # ensure we are reading
iolock_end()
@@ -433,50 +431,6 @@ function wait_readnb(x::LibuvStream, nb::Int)
nothing
end

function shutdown(s::LibuvStream)
iolock_begin()
check_open(s)
req = Libc.malloc(_sizeof_uv_shutdown)
uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call
err = ccall(:uv_shutdown, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
req, s, @cfunction(uv_shutdowncb_task, Cvoid, (Ptr{Cvoid}, Cint)))
if err < 0
Libc.free(req)
uv_error("shutdown", err)
end
ct = current_task()
preserve_handle(ct)
sigatomic_begin()
uv_req_set_data(req, ct)
iolock_end()
status = try
sigatomic_end()
wait()::Cint
finally
# try-finally unwinds the sigatomic level, so need to repeat sigatomic_end
sigatomic_end()
iolock_begin()
ct.queue === nothing || list_deletefirst!(ct.queue, ct)
if uv_req_data(req) != C_NULL
# req is still alive,
# so make sure we won't get spurious notifications later
uv_req_set_data(req, C_NULL)
else
# done with req
Libc.free(req)
end
iolock_end()
unpreserve_handle(ct)
if isopen(s) && (s.status == StatusEOF && !isa(s, TTY)) || ccall(:uv_is_readable, Cint, (Ptr{Cvoid},), s.handle) == 0
close(s)
end
end
if status < 0
throw(_UVError("shutdown", status))
end
nothing
end

function wait_close(x::Union{LibuvStream, LibuvServer})
preserve_handle(x)
lock(x.cond)
@@ -497,7 +451,7 @@ function close(stream::Union{LibuvStream, LibuvServer})
if stream.status == StatusInit
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
elseif isopen(stream)
elseif isopen(stream) || stream.status == StatusEOF
should_wait = uv_handle_data(stream) != C_NULL
if stream.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
@@ -652,33 +606,35 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
nrequested = ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf)
function readcb_specialized(stream::LibuvStream, nread::Int, nrequested::UInt)
lock(stream.cond)
if nread < 0
if nread == UV_ENOBUFS && nrequested == 0
# remind the client that stream.buffer is full
notify(stream.cond)
elseif nread == UV_EOF # libuv called uv_stop_reading already
if stream.status != StatusClosing
if stream isa TTY || ccall(:uv_is_writable, Cint, (Ptr{Cvoid},), stream.handle) != 0
# stream can still be used either by reseteof or write
stream.status = StatusEOF
try
if nread < 0
if nread == UV_ENOBUFS && nrequested == 0
# remind the client that stream.buffer is full
notify(stream.cond)
elseif nread == UV_EOF
if isa(stream, TTY)
stream.status = StatusEOF # libuv called uv_stop_reading already
notify(stream.cond)
else
# underlying stream is no longer useful: begin finalization
elseif stream.status != StatusClosing
# begin shutdown of the stream
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
else
stream.readerror = _UVError("read", nread)
# This is a fatal connection error. Shutdown requests as per the usual
# close function won't work and libuv will fail with an assertion failure
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream)
stream.status = StatusClosing
notify(stream.cond)
end
else
stream.readerror = _UVError("read", nread)
# This is a fatal connection error
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
notify_filled(stream.buffer, nread)
notify(stream.cond)
end
else
notify_filled(stream.buffer, nread)
notify(stream.cond)
finally
unlock(stream.cond)
end
unlock(stream.cond)

# Stop background reading when
# 1) there's nobody paying attention to the data we are reading
@@ -695,7 +651,6 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
nothing
end
readcb_specialized(stream_unknown_type, Int(nread), UInt(nrequested))
nothing
end

function reseteof(x::TTY)
@@ -889,7 +844,6 @@ function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int)
while bytesavailable(buf) < nb
s.readerror === nothing || throw(s.readerror)
isopen(s) || break
s.status != StatusEOF || break
iolock_end()
wait_readnb(s, nb)
iolock_begin()
@@ -936,7 +890,6 @@ function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt)
while bytesavailable(buf) < nb
s.readerror === nothing || throw(s.readerror)
isopen(s) || throw(EOFError())
s.status != StatusEOF || throw(EOFError())
iolock_end()
wait_readnb(s, nb)
iolock_begin()
@@ -993,14 +946,13 @@ function readuntil(x::LibuvStream, c::UInt8; keep::Bool=false)
@assert buf.seekable == false
if !occursin(c, buf) # fast path checks first
x.readerror === nothing || throw(x.readerror)
if isopen(x) && x.status != StatusEOF
if isopen(x)
preserve_handle(x)
lock(x.cond)
try
while !occursin(c, x.buffer)
x.readerror === nothing || throw(x.readerror)
isopen(x) || break
x.status != StatusEOF || break
start_reading(x) # ensure we are reading
iolock_end()
wait(x.cond)
@@ -1163,20 +1115,6 @@ function uv_writecb_task(req::Ptr{Cvoid}, status::Cint)
nothing
end

function uv_shutdowncb_task(req::Ptr{Cvoid}, status::Cint)
d = uv_req_data(req)
if d != C_NULL
uv_req_set_data(req, C_NULL) # let the Task know we got the shutdowncb
t = unsafe_pointer_to_objref(d)::Task
schedule(t, status)
else
# no owner for this req, safe to just free it
Libc.free(req)
end
nothing
end


_fd(x::IOStream) = RawFD(fd(x))
_fd(x::Union{OS_HANDLE, RawFD}) = x

@@ -1467,20 +1405,18 @@ mutable struct BufferStream <: LibuvStream
buffer::IOBuffer
cond::Threads.Condition
readerror::Any
is_open::Bool
buffer_writes::Bool
lock::ReentrantLock # advisory lock
status::Int

BufferStream() = new(PipeBuffer(), Threads.Condition(), nothing, false, ReentrantLock(), StatusActive)
BufferStream() = new(PipeBuffer(), Threads.Condition(), nothing, true, false, ReentrantLock())
end

isopen(s::BufferStream) = s.status != StatusClosed

shutdown(s::BufferStream) = close(s)
isopen(s::BufferStream) = s.is_open

function close(s::BufferStream)
lock(s.cond) do
s.status = StatusClosed
s.is_open = false
notify(s.cond)
nothing
end
@@ -1503,8 +1439,8 @@ function unsafe_read(s::BufferStream, a::Ptr{UInt8}, nb::UInt)
end
bytesavailable(s::BufferStream) = bytesavailable(s.buffer)

isreadable(s::BufferStream) = (isopen(s) || bytesavailable(s) > 0) && s.buffer.readable
iswritable(s::BufferStream) = isopen(s) && s.buffer.writable
isreadable(s::BufferStream) = s.buffer.readable
iswritable(s::BufferStream) = s.buffer.writable

function wait_readnb(s::BufferStream, nb::Int)
lock(s.cond) do
@@ -1514,7 +1450,7 @@ function wait_readnb(s::BufferStream, nb::Int)
end
end

show(io::IO, s::BufferStream) = print(io, "BufferStream(bytes waiting=", bytesavailable(s.buffer), ", isopen=", isopen(s), ")")
show(io::IO, s::BufferStream) = print(io, "BufferStream() bytes waiting:", bytesavailable(s.buffer), ", isopen:", s.is_open)

function readuntil(s::BufferStream, c::UInt8; keep::Bool=false)
bytes = lock(s.cond) do
1 change: 0 additions & 1 deletion doc/src/base/io-network.md
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ Base.take!(::Base.GenericIOBuffer)
Base.fdio
Base.flush
Base.close
Base.shutdown
Base.write
Base.read
Base.read!
4 changes: 2 additions & 2 deletions stdlib/Distributed/src/process_messages.jl
Original file line number Diff line number Diff line change
@@ -230,8 +230,8 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
deregister_worker(wpid)
end

close(r_stream)
close(w_stream)
isopen(r_stream) && close(r_stream)
isopen(w_stream) && close(w_stream)

if (myid() == 1) && (wpid > 1)
if oldstate != W_TERMINATING
15 changes: 9 additions & 6 deletions stdlib/Sockets/src/Sockets.jl
Original file line number Diff line number Diff line change
@@ -139,6 +139,9 @@ function TCPServer(; delay=true)
return tcp
end

isreadable(io::TCPSocket) = isopen(io) || bytesavailable(io) > 0
iswritable(io::TCPSocket) = isopen(io) && io.status != StatusClosing

"""
accept(server[, client])
@@ -575,11 +578,11 @@ Enables or disables Nagle's algorithm on a given TCP server or socket.
"""
function nagle(sock::Union{TCPServer, TCPSocket}, enable::Bool)
# disable or enable Nagle's algorithm on all OSes
iolock_begin()
check_open(sock)
Sockets.iolock_begin()
Sockets.check_open(sock)
err = ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(!enable))
# TODO: check err
iolock_end()
Sockets.iolock_end()
return err
end

@@ -589,15 +592,15 @@ end
On Linux systems, the TCP_QUICKACK is disabled or enabled on `socket`.
"""
function quickack(sock::Union{TCPServer, TCPSocket}, enable::Bool)
iolock_begin()
check_open(sock)
Sockets.iolock_begin()
Sockets.check_open(sock)
@static if Sys.islinux()
# tcp_quickack is a linux only option
if ccall(:jl_tcp_quickack, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(enable)) < 0
@warn "Networking unoptimized ( Error enabling TCP_QUICKACK : $(Libc.strerror(Libc.errno())) )" maxlog=1
end
end
iolock_end()
Sockets.iolock_end()
nothing
end

35 changes: 5 additions & 30 deletions stdlib/Sockets/test/runtests.jl
Original file line number Diff line number Diff line change
@@ -551,42 +551,17 @@ end
r = @async close(s)
@test_throws Base._UVError("connect", Base.UV_ECANCELED) Sockets.wait_connected(s)
fetch(r)
close(srv)
end
end

@testset "iswritable" begin
let addr = Sockets.InetAddr(ip"127.0.0.1", 4445)
srv = listen(addr)
let s = Sockets.TCPSocket()
Sockets.connect!(s, addr)
@test iswritable(s) broken=Sys.iswindows()
close(s)
@test !iswritable(s)
end
let s = Sockets.connect(addr)
@test iswritable(s)
shutdown(s)
@test !iswritable(s)
close(s)
end
close(srv)
srv = listen(addr)
let s = Sockets.connect(addr)
let c = accept(srv)
Base.errormonitor(@async try; write(c, c); finally; close(c); end)
end
@test iswritable(s)
write(s, "hello world\n")
shutdown(s)
@test !iswritable(s)
@test isreadable(s)
@test read(s, String) == "hello world\n"
@test !isreadable(s)
@test !isopen(s)
close(s)
end
close(srv)
s = Sockets.TCPSocket()
Sockets.connect!(s, addr)
@test iswritable(s)
close(s)
@test !iswritable(s)
end
end

21 changes: 9 additions & 12 deletions test/iobuffer.jl
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ bufcontents(io::Base.GenericIOBuffer) = unsafe_string(pointer(io.data), io.size)
@testset "Read/write empty IOBuffer" begin
io = IOBuffer()
@test eof(io)
@test_throws EOFError read(io, UInt8)
@test_throws EOFError read(io,UInt8)
@test write(io,"abc") === 3
@test isreadable(io)
@test iswritable(io)
@@ -18,7 +18,7 @@ bufcontents(io::Base.GenericIOBuffer) = unsafe_string(pointer(io.data), io.size)
@test position(io) == 3
@test eof(io)
seek(io, 0)
@test read(io, UInt8) == convert(UInt8, 'a')
@test read(io,UInt8) == convert(UInt8, 'a')
a = Vector{UInt8}(undef, 2)
@test read!(io, a) == a
@test a == UInt8['b','c']
@@ -34,24 +34,22 @@ bufcontents(io::Base.GenericIOBuffer) = unsafe_string(pointer(io.data), io.size)
truncate(io, 10)
@test position(io) == 0
@test all(io.data .== 0)
@test write(io, Int16[1, 2, 3, 4, 5, 6]) === 12
@test write(io,Int16[1,2,3,4,5,6]) === 12
seek(io, 2)
truncate(io, 10)
@test ioslength(io) == 10
io.readable = false
@test_throws ArgumentError read!(io, UInt8[0])
@test_throws ArgumentError read!(io,UInt8[0])
truncate(io, 0)
@test write(io,"boston\ncambridge\n") > 0
@test String(take!(io)) == "boston\ncambridge\n"
@test String(take!(io)) == ""
@test write(io, ComplexF64(0)) === 16
@test write(io, Rational{Int64}(1//2)) === 16
@test shutdown(io) === nothing
@test_throws ArgumentError write(io, UInt8[0])
close(io)
@test_throws ArgumentError write(io,UInt8[0])
@test_throws ArgumentError seek(io,0)
@test eof(io)
@test close(io) === nothing
@test_throws ArgumentError write(io, UInt8[0])
@test_throws ArgumentError seek(io, 0)
end

@testset "Read/write readonly IOBuffer" begin
@@ -239,7 +237,7 @@ end
@test isreadable(bstream)
@test iswritable(bstream)
@test bytesavailable(bstream) == 0
@test sprint(show, bstream) == "BufferStream(bytes waiting=$(bytesavailable(bstream.buffer)), isopen=true)"
@test sprint(show, bstream) == "BufferStream() bytes waiting:$(bytesavailable(bstream.buffer)), isopen:true"
a = rand(UInt8,10)
write(bstream,a)
@test !eof(bstream)
@@ -253,10 +251,9 @@ end
@test !eof(bstream)
read!(bstream,c)
@test c == a[3:10]
@test shutdown(bstream) === nothing
@test close(bstream) === nothing
@test eof(bstream)
@test bytesavailable(bstream) == 0
@test close(bstream) === nothing
flag = Ref{Bool}(false)
event = Base.Event()
bstream = Base.BufferStream()
8 changes: 0 additions & 8 deletions test/spawn.jl
Original file line number Diff line number Diff line change
@@ -765,15 +765,7 @@ let text = "input-test-text"
@test read(proc, String) == string(length(text), '\n')
@test success(proc)
@test String(take!(b)) == text

out = Base.BufferStream()
proc = run(catcmd, IOBuffer(text), out, wait=false)
@test proc.out === out
@test read(out, String) == text
@test success(proc)
end


@test repr(Base.CmdRedirect(``, devnull, 0, false)) == "pipeline(``, stdin>Base.DevNull())"
@test repr(Base.CmdRedirect(``, devnull, 1, true)) == "pipeline(``, stdout<Base.DevNull())"
@test repr(Base.CmdRedirect(``, devnull, 11, true)) == "pipeline(``, 11<Base.DevNull())"