Skip to content

Commit dff951e

Browse files
committed
Give BufferedReader and BufferedWriter a real usage try.
1 parent 2660391 commit dff951e

File tree

3 files changed

+70
-247
lines changed

3 files changed

+70
-247
lines changed

lib/std/http/Client.zig

Lines changed: 21 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,7 @@ pub const Connection = struct {
160160
proxied: bool = false,
161161
closing: bool = false,
162162

163-
read_start: u16 = 0,
164-
read_end: u16 = 0,
165-
read_buf: [buffer_size]u8 = undefined,
163+
buffered: std.io.BufferedReaderWriter(std.crypto.tls.max_ciphertext_record_len, 4096, *Connection),
166164

167165
pub fn rawReadAtLeast(conn: *Connection, buffer: []u8, len: usize) ReadError!usize {
168166
return switch (conn.protocol) {
@@ -181,61 +179,8 @@ pub const Connection = struct {
181179
};
182180
}
183181

184-
pub fn fill(conn: *Connection) ReadError!void {
185-
if (conn.read_end != conn.read_start) return;
186-
187-
const nread = try conn.rawReadAtLeast(conn.read_buf[0..], 1);
188-
if (nread == 0) return error.EndOfStream;
189-
conn.read_start = 0;
190-
conn.read_end = @as(u16, @intCast(nread));
191-
}
192-
193-
pub fn peek(conn: *Connection) []const u8 {
194-
return conn.read_buf[conn.read_start..conn.read_end];
195-
}
196-
197-
pub fn drop(conn: *Connection, num: u16) void {
198-
conn.read_start += num;
199-
}
200-
201-
pub fn readAtLeast(conn: *Connection, buffer: []u8, len: usize) ReadError!usize {
202-
assert(len <= buffer.len);
203-
204-
var out_index: u16 = 0;
205-
while (out_index < len) {
206-
const available_read = conn.read_end - conn.read_start;
207-
const available_buffer = buffer.len - out_index;
208-
209-
if (available_read > available_buffer) { // partially read buffered data
210-
@memcpy(buffer[out_index..], conn.read_buf[conn.read_start..conn.read_end][0..available_buffer]);
211-
out_index += @as(u16, @intCast(available_buffer));
212-
conn.read_start += @as(u16, @intCast(available_buffer));
213-
214-
break;
215-
} else if (available_read > 0) { // fully read buffered data
216-
@memcpy(buffer[out_index..][0..available_read], conn.read_buf[conn.read_start..conn.read_end]);
217-
out_index += available_read;
218-
conn.read_start += available_read;
219-
220-
if (out_index >= len) break;
221-
}
222-
223-
const leftover_buffer = available_buffer - available_read;
224-
const leftover_len = len - out_index;
225-
226-
if (leftover_buffer > conn.read_buf.len) {
227-
// skip the buffer if the output is large enough
228-
return conn.rawReadAtLeast(buffer[out_index..], leftover_len);
229-
}
230-
231-
try conn.fill();
232-
}
233-
234-
return out_index;
235-
}
236-
237182
pub fn read(conn: *Connection, buffer: []u8) ReadError!usize {
238-
return conn.readAtLeast(buffer, 1);
183+
return conn.rawReadAtLeast(buffer, 1);
239184
}
240185

241186
pub const ReadError = error{
@@ -250,7 +195,7 @@ pub const Connection = struct {
250195
pub const Reader = std.io.Reader(*Connection, ReadError, read);
251196

252197
pub fn reader(conn: *Connection) Reader {
253-
return Reader{ .context = conn };
198+
return .{ .context = conn };
254199
}
255200

256201
pub fn writeAll(conn: *Connection, buffer: []const u8) !void {
@@ -537,8 +482,7 @@ pub const Request = struct {
537482

538483
/// Send the request to the server.
539484
pub fn start(req: *Request) StartError!void {
540-
var buffered = std.io.bufferedWriter(req.connection.?.data.writer());
541-
const w = buffered.writer();
485+
const w = req.connection.?.data.buffered.writer();
542486

543487
try w.writeAll(@tagName(req.method));
544488
try w.writeByte(' ');
@@ -612,7 +556,7 @@ pub const Request = struct {
612556

613557
try w.writeAll("\r\n");
614558

615-
try buffered.flush();
559+
try req.connection.?.data.buffered.flush();
616560
}
617561

618562
pub const TransferReadError = Connection.ReadError || proto.HeadersParser.ReadError;
@@ -628,7 +572,7 @@ pub const Request = struct {
628572

629573
var index: usize = 0;
630574
while (index == 0) {
631-
const amt = try req.response.parser.read(&req.connection.?.data, buf[index..], req.response.skip);
575+
const amt = try req.response.parser.read(&req.connection.?.data.buffered, buf[index..], req.response.skip);
632576
if (amt == 0 and req.response.parser.done) break;
633577
index += amt;
634578
}
@@ -646,10 +590,11 @@ pub const Request = struct {
646590
pub fn wait(req: *Request) WaitError!void {
647591
while (true) { // handle redirects
648592
while (true) { // read headers
649-
try req.connection.?.data.fill();
593+
const rest = req.connection.?.data.buffered.peek(0);
594+
if (rest.len == 0) return error.EndOfStream;
650595

651-
const nchecked = try req.response.parser.checkCompleteHead(req.client.allocator, req.connection.?.data.peek());
652-
req.connection.?.data.drop(@as(u16, @intCast(nchecked)));
596+
const nchecked = try req.response.parser.checkCompleteHead(req.client.allocator, rest);
597+
try req.connection.?.data.buffered.discard(@intCast(nchecked));
653598

654599
if (req.response.parser.state.isContent()) break;
655600
}
@@ -765,10 +710,12 @@ pub const Request = struct {
765710
const has_trail = !req.response.parser.state.isContent();
766711

767712
while (!req.response.parser.state.isContent()) { // read trailing headers
768-
try req.connection.?.data.fill();
713+
const rest = req.connection.?.data.buffered.peek(0);
714+
if (rest.len == 0) return error.EndOfStream;
715+
716+
const nchecked = try req.response.parser.checkCompleteHead(req.client.allocator, rest);
769717

770-
const nchecked = try req.response.parser.checkCompleteHead(req.client.allocator, req.connection.?.data.peek());
771-
req.connection.?.data.drop(@as(u16, @intCast(nchecked)));
718+
try req.connection.?.data.buffered.discard(@intCast(nchecked));
772719
}
773720

774721
if (has_trail) {
@@ -898,6 +845,7 @@ pub fn connectUnproxied(client: *Client, host: []const u8, port: u16, protocol:
898845
.stream = stream,
899846
.tls_client = undefined,
900847
.protocol = protocol,
848+
.buffered = undefined,
901849

902850
.host = try client.allocator.dupe(u8, host),
903851
.port = port,
@@ -916,6 +864,11 @@ pub fn connectUnproxied(client: *Client, host: []const u8, port: u16, protocol:
916864
conn.data.tls_client.allow_truncation_attacks = true;
917865
},
918866
}
867+
conn.data.buffered = .{ .buffered_reader = .{
868+
.unbuffered_reader = &conn.data,
869+
}, .buffered_writer = .{
870+
.unbuffered_writer = &conn.data,
871+
} };
919872

920873
client.connection_pool.addUsed(conn);
921874

lib/std/http/Server.zig

Lines changed: 23 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@ pub const Connection = struct {
2323
protocol: Protocol,
2424

2525
closing: bool = true,
26-
27-
read_buf: [buffer_size]u8 = undefined,
28-
read_start: u16 = 0,
29-
read_end: u16 = 0,
26+
buffered: std.io.BufferedReaderWriter(std.crypto.tls.max_ciphertext_record_len, 4096, *Connection),
3027

3128
pub fn rawReadAtLeast(conn: *Connection, buffer: []u8, len: usize) ReadError!usize {
3229
return switch (conn.protocol) {
@@ -40,61 +37,8 @@ pub const Connection = struct {
4037
};
4138
}
4239

43-
pub fn fill(conn: *Connection) ReadError!void {
44-
if (conn.read_end != conn.read_start) return;
45-
46-
const nread = try conn.rawReadAtLeast(conn.read_buf[0..], 1);
47-
if (nread == 0) return error.EndOfStream;
48-
conn.read_start = 0;
49-
conn.read_end = @as(u16, @intCast(nread));
50-
}
51-
52-
pub fn peek(conn: *Connection) []const u8 {
53-
return conn.read_buf[conn.read_start..conn.read_end];
54-
}
55-
56-
pub fn drop(conn: *Connection, num: u16) void {
57-
conn.read_start += num;
58-
}
59-
60-
pub fn readAtLeast(conn: *Connection, buffer: []u8, len: usize) ReadError!usize {
61-
assert(len <= buffer.len);
62-
63-
var out_index: u16 = 0;
64-
while (out_index < len) {
65-
const available_read = conn.read_end - conn.read_start;
66-
const available_buffer = buffer.len - out_index;
67-
68-
if (available_read > available_buffer) { // partially read buffered data
69-
@memcpy(buffer[out_index..], conn.read_buf[conn.read_start..conn.read_end][0..available_buffer]);
70-
out_index += @as(u16, @intCast(available_buffer));
71-
conn.read_start += @as(u16, @intCast(available_buffer));
72-
73-
break;
74-
} else if (available_read > 0) { // fully read buffered data
75-
@memcpy(buffer[out_index..][0..available_read], conn.read_buf[conn.read_start..conn.read_end]);
76-
out_index += available_read;
77-
conn.read_start += available_read;
78-
79-
if (out_index >= len) break;
80-
}
81-
82-
const leftover_buffer = available_buffer - available_read;
83-
const leftover_len = len - out_index;
84-
85-
if (leftover_buffer > conn.read_buf.len) {
86-
// skip the buffer if the output is large enough
87-
return conn.rawReadAtLeast(buffer[out_index..], leftover_len);
88-
}
89-
90-
try conn.fill();
91-
}
92-
93-
return out_index;
94-
}
95-
9640
pub fn read(conn: *Connection, buffer: []u8) ReadError!usize {
97-
return conn.readAtLeast(buffer, 1);
41+
return conn.rawReadAtLeast(buffer, 1);
9842
}
9943

10044
pub const ReadError = error{
@@ -107,7 +51,7 @@ pub const Connection = struct {
10751
pub const Reader = std.io.Reader(*Connection, ReadError, read);
10852

10953
pub fn reader(conn: *Connection) Reader {
110-
return Reader{ .context = conn };
54+
return .{ .context = conn };
11155
}
11256

11357
pub fn writeAll(conn: *Connection, buffer: []const u8) WriteError!void {
@@ -397,8 +341,7 @@ pub const Response = struct {
397341
.first, .start, .responded, .finished => unreachable,
398342
}
399343

400-
var buffered = std.io.bufferedWriter(res.connection.writer());
401-
const w = buffered.writer();
344+
const w = res.connection.buffered.writer();
402345

403346
try w.writeAll(@tagName(res.version));
404347
try w.writeByte(' ');
@@ -455,8 +398,7 @@ pub const Response = struct {
455398
try w.print("{}", .{res.headers});
456399

457400
try w.writeAll("\r\n");
458-
459-
try buffered.flush();
401+
try res.connection.buffered.flush();
460402
}
461403

462404
pub const TransferReadError = Connection.ReadError || proto.HeadersParser.ReadError;
@@ -472,7 +414,7 @@ pub const Response = struct {
472414

473415
var index: usize = 0;
474416
while (index == 0) {
475-
const amt = try res.request.parser.read(&res.connection, buf[index..], false);
417+
const amt = try res.request.parser.read(&res.connection.buffered, buf[index..], false);
476418
if (amt == 0 and res.request.parser.done) break;
477419
index += amt;
478420
}
@@ -490,10 +432,11 @@ pub const Response = struct {
490432
}
491433

492434
while (true) {
493-
try res.connection.fill();
435+
var rest = res.connection.buffered.peek(0);
436+
if (rest.len == 0) return error.EndOfStream;
494437

495-
const nchecked = try res.request.parser.checkCompleteHead(res.allocator, res.connection.peek());
496-
res.connection.drop(@as(u16, @intCast(nchecked)));
438+
const nchecked = try res.request.parser.checkCompleteHead(res.allocator, rest);
439+
try res.connection.buffered.discard(@intCast(nchecked));
497440

498441
if (res.request.parser.state.isContent()) break;
499442
}
@@ -557,10 +500,11 @@ pub const Response = struct {
557500
const has_trail = !res.request.parser.state.isContent();
558501

559502
while (!res.request.parser.state.isContent()) { // read trailing headers
560-
try res.connection.fill();
503+
var rest = res.connection.buffered.peek(0);
504+
if (rest.len == 0) return error.EndOfStream;
561505

562-
const nchecked = try res.request.parser.checkCompleteHead(res.allocator, res.connection.peek());
563-
res.connection.drop(@as(u16, @intCast(nchecked)));
506+
const nchecked = try res.request.parser.checkCompleteHead(res.allocator, rest);
507+
try res.connection.buffered.discard(@intCast(nchecked));
564508
}
565509

566510
if (has_trail) {
@@ -685,12 +629,13 @@ pub const AcceptOptions = struct {
685629
pub fn accept(server: *Server, options: AcceptOptions) AcceptError!Response {
686630
const in = try server.socket.accept();
687631

688-
return Response{
632+
var resp = Response{
689633
.allocator = options.allocator,
690634
.address = in.address,
691635
.connection = .{
692636
.stream = in.stream,
693637
.protocol = .plain,
638+
.buffered = undefined,
694639
},
695640
.headers = .{ .allocator = options.allocator },
696641
.request = .{
@@ -704,6 +649,13 @@ pub fn accept(server: *Server, options: AcceptOptions) AcceptError!Response {
704649
},
705650
},
706651
};
652+
653+
resp.connection.buffered = .{ .buffered_reader = .{
654+
.unbuffered_reader = &resp.connection,
655+
}, .buffered_writer = .{
656+
.unbuffered_writer = &resp.connection,
657+
} };
658+
return resp;
707659
}
708660

709661
test "HTTP server handles a chunked transfer coding request" {

0 commit comments

Comments
 (0)