Skip to content

Commit 23ccff9

Browse files
committed
std.http.Server: collapse BufferedConnection into Connection
1 parent 0e5e6cb commit 23ccff9

File tree

3 files changed

+88
-141
lines changed

3 files changed

+88
-141
lines changed

lib/std/http/Client.zig

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ pub const Connection = struct {
184184
pub fn fill(conn: *Connection) ReadError!void {
185185
if (conn.read_end != conn.read_start) return;
186186

187-
const nread = try conn.read(conn.read_buf[0..]);
187+
const nread = try conn.rawReadAtLeast(conn.read_buf[0..], 1);
188188
if (nread == 0) return error.EndOfStream;
189189
conn.read_start = 0;
190190
conn.read_end = @intCast(u16, nread);
@@ -207,13 +207,13 @@ pub const Connection = struct {
207207
const available_buffer = buffer.len - out_index;
208208

209209
if (available_read > available_buffer) { // partially read buffered data
210-
@memcpy(buffer[out_index..], conn.read_buf[conn.read_start..][0..available_buffer]);
210+
@memcpy(buffer[out_index..], conn.read_buf[conn.read_start..conn.read_end][0..available_buffer]);
211211
out_index += @intCast(u16, available_buffer);
212212
conn.read_start += @intCast(u16, available_buffer);
213213

214214
break;
215215
} else if (available_read > 0) { // fully read buffered data
216-
@memcpy(buffer[out_index..][0..available_read], conn.read_buf[conn.read_start..]);
216+
@memcpy(buffer[out_index..][0..available_read], conn.read_buf[conn.read_start..conn.read_end]);
217217
out_index += available_read;
218218
conn.read_start += available_read;
219219

@@ -608,6 +608,8 @@ pub const Request = struct {
608608
try w.print("{}", .{req.headers});
609609

610610
try w.writeAll("\r\n");
611+
612+
try buffered.flush();
611613
}
612614

613615
pub const TransferReadError = Connection.ReadError || proto.HeadersParser.ReadError;

lib/std/http/Server.zig

Lines changed: 83 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,92 @@ socket: net.StreamServer,
1616

1717
/// An interface to either a plain or TLS connection.
1818
pub const Connection = struct {
19+
pub const buffer_size = std.crypto.tls.max_ciphertext_record_len;
20+
pub const Protocol = enum { plain };
21+
1922
stream: net.Stream,
2023
protocol: Protocol,
2124

2225
closing: bool = true,
2326

24-
pub const Protocol = enum { plain };
27+
read_buf: [buffer_size]u8 = undefined,
28+
read_start: u16 = 0,
29+
read_end: u16 = 0,
2530

26-
pub fn read(conn: *Connection, buffer: []u8) ReadError!usize {
31+
pub fn rawReadAtLeast(conn: *Connection, buffer: []u8, len: usize) ReadError!usize {
2732
return switch (conn.protocol) {
28-
.plain => conn.stream.read(buffer),
29-
// .tls => return conn.tls_client.read(conn.stream, buffer),
30-
} catch |err| switch (err) {
31-
error.ConnectionTimedOut => return error.ConnectionTimedOut,
32-
error.ConnectionResetByPeer, error.BrokenPipe => return error.ConnectionResetByPeer,
33-
else => return error.UnexpectedReadFailure,
33+
.plain => conn.stream.readAtLeast(buffer, len),
34+
// .tls => conn.tls_client.readAtLeast(conn.stream, buffer, len),
35+
} catch |err| {
36+
switch (err) {
37+
error.ConnectionResetByPeer, error.BrokenPipe => return error.ConnectionResetByPeer,
38+
else => return error.UnexpectedReadFailure,
39+
}
3440
};
3541
}
3642

43+
pub fn fill(conn: *Connection) ReadError!void {
44+
if (conn.read_end != conn.read_start) return;
45+
46+
const nread = try conn.rawReadAtLeast(conn.read_buf[0..], 1);
47+
if (nread == 0) return error.EndOfStream;
48+
conn.read_start = 0;
49+
conn.read_end = @intCast(u16, nread);
50+
}
51+
52+
pub fn peek(conn: *Connection) []const u8 {
53+
return conn.read_buf[conn.read_start..conn.read_end];
54+
}
55+
56+
pub fn drop(conn: *Connection, num: u16) void {
57+
conn.read_start += num;
58+
}
59+
3760
pub fn readAtLeast(conn: *Connection, buffer: []u8, len: usize) ReadError!usize {
38-
return switch (conn.protocol) {
39-
.plain => conn.stream.readAtLeast(buffer, len),
40-
// .tls => return conn.tls_client.readAtLeast(conn.stream, buffer, len),
41-
} catch |err| switch (err) {
42-
error.ConnectionTimedOut => return error.ConnectionTimedOut,
43-
error.ConnectionResetByPeer, error.BrokenPipe => return error.ConnectionResetByPeer,
44-
else => return error.UnexpectedReadFailure,
45-
};
61+
assert(len <= buffer.len);
62+
63+
var out_index: u16 = 0;
64+
while (out_index < len) {
65+
const available_read = conn.read_end - conn.read_start;
66+
const available_buffer = buffer.len - out_index;
67+
68+
if (available_read > available_buffer) { // partially read buffered data
69+
@memcpy(buffer[out_index..], conn.read_buf[conn.read_start..conn.read_end][0..available_buffer]);
70+
out_index += @intCast(u16, available_buffer);
71+
conn.read_start += @intCast(u16, available_buffer);
72+
73+
break;
74+
} else if (available_read > 0) { // fully read buffered data
75+
@memcpy(buffer[out_index..][0..available_read], conn.read_buf[conn.read_start..conn.read_end]);
76+
out_index += available_read;
77+
conn.read_start += available_read;
78+
79+
if (out_index >= len) break;
80+
}
81+
82+
const leftover_buffer = available_buffer - available_read;
83+
const leftover_len = len - out_index;
84+
85+
if (leftover_buffer > conn.read_buf.len) {
86+
// skip the buffer if the output is large enough
87+
return conn.rawReadAtLeast(buffer[out_index..], leftover_len);
88+
}
89+
90+
try conn.fill();
91+
}
92+
93+
return out_index;
94+
}
95+
96+
pub fn read(conn: *Connection, buffer: []u8) ReadError!usize {
97+
return conn.readAtLeast(buffer, 1);
4698
}
4799

48100
pub const ReadError = error{
49101
ConnectionTimedOut,
50102
ConnectionResetByPeer,
51103
UnexpectedReadFailure,
104+
EndOfStream,
52105
};
53106

54107
pub const Reader = std.io.Reader(*Connection, ReadError, read);
@@ -93,112 +146,6 @@ pub const Connection = struct {
93146
}
94147
};
95148

96-
/// A buffered (and peekable) Connection.
97-
pub const BufferedConnection = struct {
98-
pub const buffer_size = std.crypto.tls.max_ciphertext_record_len;
99-
100-
conn: Connection,
101-
read_buf: [buffer_size]u8 = undefined,
102-
read_start: u16 = 0,
103-
read_end: u16 = 0,
104-
105-
write_buf: [buffer_size]u8 = undefined,
106-
write_end: u16 = 0,
107-
108-
pub fn fill(bconn: *BufferedConnection) ReadError!void {
109-
if (bconn.read_end != bconn.read_start) return;
110-
111-
const nread = try bconn.conn.read(bconn.read_buf[0..]);
112-
if (nread == 0) return error.EndOfStream;
113-
bconn.read_start = 0;
114-
bconn.read_end = @intCast(u16, nread);
115-
}
116-
117-
pub fn peek(bconn: *BufferedConnection) []const u8 {
118-
return bconn.read_buf[bconn.read_start..bconn.read_end];
119-
}
120-
121-
pub fn drop(bconn: *BufferedConnection, num: u16) void {
122-
bconn.read_start += num;
123-
}
124-
125-
pub fn readAtLeast(bconn: *BufferedConnection, buffer: []u8, len: usize) ReadError!usize {
126-
var out_index: u16 = 0;
127-
while (out_index < len) {
128-
const available = bconn.read_end - bconn.read_start;
129-
const left = buffer.len - out_index;
130-
131-
if (available > 0) {
132-
const can_read = @intCast(u16, @min(available, left));
133-
134-
@memcpy(buffer[out_index..][0..can_read], bconn.read_buf[bconn.read_start..][0..can_read]);
135-
out_index += can_read;
136-
bconn.read_start += can_read;
137-
138-
continue;
139-
}
140-
141-
if (left > bconn.read_buf.len) {
142-
// skip the buffer if the output is large enough
143-
return bconn.conn.read(buffer[out_index..]);
144-
}
145-
146-
try bconn.fill();
147-
}
148-
149-
return out_index;
150-
}
151-
152-
pub fn read(bconn: *BufferedConnection, buffer: []u8) ReadError!usize {
153-
return bconn.readAtLeast(buffer, 1);
154-
}
155-
156-
pub const ReadError = Connection.ReadError || error{EndOfStream};
157-
pub const Reader = std.io.Reader(*BufferedConnection, ReadError, read);
158-
159-
pub fn reader(bconn: *BufferedConnection) Reader {
160-
return Reader{ .context = bconn };
161-
}
162-
163-
pub fn writeAll(bconn: *BufferedConnection, buffer: []const u8) WriteError!void {
164-
if (bconn.write_buf.len - bconn.write_end >= buffer.len) {
165-
@memcpy(bconn.write_buf[bconn.write_end..][0..buffer.len], buffer);
166-
bconn.write_end += @intCast(u16, buffer.len);
167-
} else {
168-
try bconn.flush();
169-
try bconn.conn.writeAll(buffer);
170-
}
171-
}
172-
173-
pub fn write(bconn: *BufferedConnection, buffer: []const u8) WriteError!usize {
174-
if (bconn.write_buf.len - bconn.write_end >= buffer.len) {
175-
@memcpy(bconn.write_buf[bconn.write_end..][0..buffer.len], buffer);
176-
bconn.write_end += @intCast(u16, buffer.len);
177-
178-
return buffer.len;
179-
} else {
180-
try bconn.flush();
181-
return try bconn.conn.write(buffer);
182-
}
183-
}
184-
185-
pub fn flush(bconn: *BufferedConnection) WriteError!void {
186-
defer bconn.write_end = 0;
187-
return bconn.conn.writeAll(bconn.write_buf[0..bconn.write_end]);
188-
}
189-
190-
pub const WriteError = Connection.WriteError;
191-
pub const Writer = std.io.Writer(*BufferedConnection, WriteError, write);
192-
193-
pub fn writer(bconn: *BufferedConnection) Writer {
194-
return Writer{ .context = bconn };
195-
}
196-
197-
pub fn close(bconn: *BufferedConnection) void {
198-
bconn.conn.close();
199-
}
200-
};
201-
202149
/// The mode of transport for responses.
203150
pub const ResponseTransfer = union(enum) {
204151
content_length: u64,
@@ -351,7 +298,7 @@ pub const Response = struct {
351298

352299
allocator: Allocator,
353300
address: net.Address,
354-
connection: BufferedConnection,
301+
connection: Connection,
355302

356303
headers: http.Headers,
357304
request: Request,
@@ -388,7 +335,7 @@ pub const Response = struct {
388335

389336
if (!res.request.parser.done) {
390337
// If the response wasn't fully read, then we need to close the connection.
391-
res.connection.conn.closing = true;
338+
res.connection.closing = true;
392339
return .closing;
393340
}
394341

@@ -402,9 +349,9 @@ pub const Response = struct {
402349
const req_connection = res.request.headers.getFirstValue("connection");
403350
const req_keepalive = req_connection != null and !std.ascii.eqlIgnoreCase("close", req_connection.?);
404351
if (req_keepalive and (res_keepalive or res_connection == null)) {
405-
res.connection.conn.closing = false;
352+
res.connection.closing = false;
406353
} else {
407-
res.connection.conn.closing = true;
354+
res.connection.closing = true;
408355
}
409356

410357
switch (res.request.compression) {
@@ -434,14 +381,14 @@ pub const Response = struct {
434381
.parser = res.request.parser,
435382
};
436383

437-
if (res.connection.conn.closing) {
384+
if (res.connection.closing) {
438385
return .closing;
439386
} else {
440387
return .reset;
441388
}
442389
}
443390

444-
pub const DoError = BufferedConnection.WriteError || error{ UnsupportedTransferEncoding, InvalidContentLength };
391+
pub const DoError = Connection.WriteError || error{ UnsupportedTransferEncoding, InvalidContentLength };
445392

446393
/// Send the response headers.
447394
pub fn do(res: *Response) !void {
@@ -450,7 +397,8 @@ pub const Response = struct {
450397
.first, .start, .responded, .finished => unreachable,
451398
}
452399

453-
const w = res.connection.writer();
400+
var buffered = std.io.bufferedWriter(res.connection.writer());
401+
const w = buffered.writer();
454402

455403
try w.writeAll(@tagName(res.version));
456404
try w.writeByte(' ');
@@ -508,10 +456,10 @@ pub const Response = struct {
508456

509457
try w.writeAll("\r\n");
510458

511-
try res.connection.flush();
459+
try buffered.flush();
512460
}
513461

514-
pub const TransferReadError = BufferedConnection.ReadError || proto.HeadersParser.ReadError;
462+
pub const TransferReadError = Connection.ReadError || proto.HeadersParser.ReadError;
515463

516464
pub const TransferReader = std.io.Reader(*Response, TransferReadError, transferRead);
517465

@@ -532,7 +480,7 @@ pub const Response = struct {
532480
return index;
533481
}
534482

535-
pub const WaitError = BufferedConnection.ReadError || proto.HeadersParser.CheckCompleteHeadError || Request.ParseError || error{ CompressionInitializationFailed, CompressionNotSupported };
483+
pub const WaitError = Connection.ReadError || proto.HeadersParser.CheckCompleteHeadError || Request.ParseError || error{ CompressionInitializationFailed, CompressionNotSupported };
536484

537485
/// Wait for the client to send a complete request head.
538486
pub fn wait(res: *Response) WaitError!void {
@@ -637,7 +585,7 @@ pub const Response = struct {
637585
return index;
638586
}
639587

640-
pub const WriteError = BufferedConnection.WriteError || error{ NotWriteable, MessageTooLong };
588+
pub const WriteError = Connection.WriteError || error{ NotWriteable, MessageTooLong };
641589

642590
pub const Writer = std.io.Writer(*Response, WriteError, write);
643591

@@ -692,8 +640,6 @@ pub const Response = struct {
692640
.content_length => |len| if (len != 0) return error.MessageNotCompleted,
693641
.none => {},
694642
}
695-
696-
try res.connection.flush();
697643
}
698644
};
699645

@@ -742,10 +688,10 @@ pub fn accept(server: *Server, options: AcceptOptions) AcceptError!Response {
742688
return Response{
743689
.allocator = options.allocator,
744690
.address = in.address,
745-
.connection = .{ .conn = .{
691+
.connection = .{
746692
.stream = in.stream,
747693
.protocol = .plain,
748-
} },
694+
},
749695
.headers = .{ .allocator = options.allocator },
750696
.request = .{
751697
.version = undefined,

test/standalone/http.zig

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ fn handleRequest(res: *Server.Response) !void {
8686
try res.writeAll("World!\n");
8787
// try res.finish();
8888
try res.connection.writeAll("0\r\nX-Checksum: aaaa\r\n\r\n");
89-
try res.connection.flush();
9089
} else if (mem.eql(u8, res.request.target, "/redirect/1")) {
9190
res.transfer_encoding = .chunked;
9291

0 commit comments

Comments
 (0)