Skip to content

Commit 56080c4

Browse files
committed
Implement clone() for TCP/UDP/Unix sockets
This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
1 parent ef53b7a commit 56080c4

File tree

18 files changed

+812
-82
lines changed

18 files changed

+812
-82
lines changed

src/libnative/io/file.rs

+31-16
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
//! Blocking posix-based file I/O
1212
13+
use std::sync::arc::UnsafeArc;
1314
use std::c_str::CString;
1415
use std::io::IoError;
1516
use std::io;
@@ -55,9 +56,13 @@ pub fn keep_going(data: &[u8], f: |*u8, uint| -> i64) -> i64 {
5556

5657
pub type fd_t = libc::c_int;
5758

59+
struct Inner {
60+
fd: fd_t,
61+
close_on_drop: bool,
62+
}
63+
5864
pub struct FileDesc {
59-
priv fd: fd_t,
60-
priv close_on_drop: bool,
65+
priv inner: UnsafeArc<Inner>
6166
}
6267

6368
impl FileDesc {
@@ -70,7 +75,10 @@ impl FileDesc {
7075
/// Note that all I/O operations done on this object will be *blocking*, but
7176
/// they do not require the runtime to be active.
7277
pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc {
73-
FileDesc { fd: fd, close_on_drop: close_on_drop }
78+
FileDesc { inner: UnsafeArc::new(Inner {
79+
fd: fd,
80+
close_on_drop: close_on_drop
81+
}) }
7482
}
7583

7684
// FIXME(#10465) these functions should not be public, but anything in
@@ -80,7 +88,7 @@ impl FileDesc {
8088
#[cfg(windows)] type rlen = libc::c_uint;
8189
#[cfg(not(windows))] type rlen = libc::size_t;
8290
let ret = retry(|| unsafe {
83-
libc::read(self.fd,
91+
libc::read(self.fd(),
8492
buf.as_ptr() as *mut libc::c_void,
8593
buf.len() as rlen) as libc::c_int
8694
});
@@ -97,7 +105,7 @@ impl FileDesc {
97105
#[cfg(not(windows))] type wlen = libc::size_t;
98106
let ret = keep_going(buf, |buf, len| {
99107
unsafe {
100-
libc::write(self.fd, buf as *libc::c_void, len as wlen) as i64
108+
libc::write(self.fd(), buf as *libc::c_void, len as wlen) as i64
101109
}
102110
});
103111
if ret < 0 {
@@ -107,7 +115,11 @@ impl FileDesc {
107115
}
108116
}
109117

110-
pub fn fd(&self) -> fd_t { self.fd }
118+
pub fn fd(&self) -> fd_t {
119+
// This unsafety is fine because we're just reading off the file
120+
// descriptor, no one is modifying this.
121+
unsafe { (*self.inner.get()).fd }
122+
}
111123
}
112124

113125
impl io::Reader for FileDesc {
@@ -130,7 +142,7 @@ impl rtio::RtioFileStream for FileDesc {
130142
self.inner_write(buf)
131143
}
132144
fn pread(&mut self, buf: &mut [u8], offset: u64) -> Result<int, IoError> {
133-
return os_pread(self.fd, buf.as_ptr(), buf.len(), offset);
145+
return os_pread(self.fd(), buf.as_ptr(), buf.len(), offset);
134146

135147
#[cfg(windows)]
136148
fn os_pread(fd: c_int, buf: *u8, amt: uint, offset: u64) -> IoResult<int> {
@@ -162,7 +174,7 @@ impl rtio::RtioFileStream for FileDesc {
162174
}
163175
}
164176
fn pwrite(&mut self, buf: &[u8], offset: u64) -> Result<(), IoError> {
165-
return os_pwrite(self.fd, buf.as_ptr(), buf.len(), offset);
177+
return os_pwrite(self.fd(), buf.as_ptr(), buf.len(), offset);
166178

167179
#[cfg(windows)]
168180
fn os_pwrite(fd: c_int, buf: *u8, amt: uint, offset: u64) -> IoResult<()> {
@@ -197,7 +209,7 @@ impl rtio::RtioFileStream for FileDesc {
197209
io::SeekCur => libc::FILE_CURRENT,
198210
};
199211
unsafe {
200-
let handle = libc::get_osfhandle(self.fd) as libc::HANDLE;
212+
let handle = libc::get_osfhandle(self.fd()) as libc::HANDLE;
201213
let mut newpos = 0;
202214
match libc::SetFilePointerEx(handle, pos, &mut newpos, whence) {
203215
0 => Err(super::last_error()),
@@ -212,23 +224,23 @@ impl rtio::RtioFileStream for FileDesc {
212224
io::SeekEnd => libc::SEEK_END,
213225
io::SeekCur => libc::SEEK_CUR,
214226
};
215-
let n = unsafe { libc::lseek(self.fd, pos as libc::off_t, whence) };
227+
let n = unsafe { libc::lseek(self.fd(), pos as libc::off_t, whence) };
216228
if n < 0 {
217229
Err(super::last_error())
218230
} else {
219231
Ok(n as u64)
220232
}
221233
}
222234
fn tell(&self) -> Result<u64, IoError> {
223-
let n = unsafe { libc::lseek(self.fd, 0, libc::SEEK_CUR) };
235+
let n = unsafe { libc::lseek(self.fd(), 0, libc::SEEK_CUR) };
224236
if n < 0 {
225237
Err(super::last_error())
226238
} else {
227239
Ok(n as u64)
228240
}
229241
}
230242
fn fsync(&mut self) -> Result<(), IoError> {
231-
return os_fsync(self.fd);
243+
return os_fsync(self.fd());
232244

233245
#[cfg(windows)]
234246
fn os_fsync(fd: c_int) -> IoResult<()> {
@@ -247,7 +259,7 @@ impl rtio::RtioFileStream for FileDesc {
247259

248260
#[cfg(not(windows))]
249261
fn datasync(&mut self) -> Result<(), IoError> {
250-
return super::mkerr_libc(os_datasync(self.fd));
262+
return super::mkerr_libc(os_datasync(self.fd()));
251263

252264
#[cfg(target_os = "macos")]
253265
fn os_datasync(fd: c_int) -> c_int {
@@ -270,7 +282,7 @@ impl rtio::RtioFileStream for FileDesc {
270282
Ok(_) => {}, Err(e) => return Err(e),
271283
};
272284
let ret = unsafe {
273-
let handle = libc::get_osfhandle(self.fd) as libc::HANDLE;
285+
let handle = libc::get_osfhandle(self.fd()) as libc::HANDLE;
274286
match libc::SetEndOfFile(handle) {
275287
0 => Err(super::last_error()),
276288
_ => Ok(())
@@ -282,7 +294,7 @@ impl rtio::RtioFileStream for FileDesc {
282294
#[cfg(unix)]
283295
fn truncate(&mut self, offset: i64) -> Result<(), IoError> {
284296
super::mkerr_libc(retry(|| unsafe {
285-
libc::ftruncate(self.fd, offset as libc::off_t)
297+
libc::ftruncate(self.fd(), offset as libc::off_t)
286298
}))
287299
}
288300
}
@@ -294,6 +306,9 @@ impl rtio::RtioPipe for FileDesc {
294306
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
295307
self.inner_write(buf)
296308
}
309+
fn clone(&self) -> ~rtio::RtioPipe {
310+
~FileDesc { inner: self.inner.clone() } as ~rtio::RtioPipe
311+
}
297312
}
298313

299314
impl rtio::RtioTTY for FileDesc {
@@ -312,7 +327,7 @@ impl rtio::RtioTTY for FileDesc {
312327
fn isatty(&self) -> bool { false }
313328
}
314329

315-
impl Drop for FileDesc {
330+
impl Drop for Inner {
316331
fn drop(&mut self) {
317332
// closing stdio file handles makes no sense, so never do it. Also, note
318333
// that errors are ignored when closing a file descriptor. The reason

0 commit comments

Comments
 (0)