Skip to content

Commit cecc90a

Browse files
committed
Package.Fetch: fix Git package fetching
This commit works around ziglang#18967 by adding an `AccumulatingReader`, which accumulates data read from the underlying packfile, and by keeping track of the position in the packfile and hash/checksum information separately rather than using reader composition. That is, the packfile position and hashes/checksums are updated with the accumulated read history data only after we can determine what data has actually been used by the decompressor rather than merely being buffered. The only addition to the standard library APIs to support this change is the `unreadBytes` function in `std.compress.flate.Inflate`, which allows the user to determine how many bytes have been read only for buffering and not used as part of compressed data. These changes can be reverted if ziglang#18967 is resolved with a decompressor that reads precisely only the number of bytes needed for decompression.
1 parent f3bd177 commit cecc90a

File tree

2 files changed

+161
-22
lines changed

2 files changed

+161
-22
lines changed

lib/std/compress/flate/inflate.zig

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,14 @@ pub fn Inflate(comptime container: Container, comptime ReaderType: type) type {
288288
}
289289
}
290290

291+
/// Returns the number of bytes that have been read from the internal
292+
/// reader but not yet consumed by the decompressor.
293+
pub fn unreadBytes(self: Self) usize {
294+
// There can be no error here: the denominator is not zero, and
295+
// nbits must be <= 64, so there can be no overflow.
296+
return std.math.divCeil(usize, self.bits.nbits, 8) catch unreachable;
297+
}
298+
291299
// Iterator interface
292300

293301
/// Can be used in iterator like loop without memcpy to another buffer:

src/Package/Fetch/git.zig

Lines changed: 153 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,86 @@ pub fn indexPack(allocator: Allocator, pack: std.fs.File, index_writer: anytype)
10911091
try index_writer.writeAll(&index_checksum);
10921092
}
10931093

1094+
/// A reader that stores read data in a growable internal buffer. The read
1095+
/// position can be rewound to allow previously read data to be read again.
1096+
fn AccumulatingReader(comptime ReaderType: type) type {
1097+
return struct {
1098+
child_reader: ReaderType,
1099+
buffer: std.ArrayListUnmanaged(u8) = .{},
1100+
/// The position in `buffer` from which reads should start, returning
1101+
/// buffered data. If this is `buffer.items.len`, data will be read from
1102+
/// `child_reader` instead.
1103+
read_start: usize = 0,
1104+
allocator: Allocator,
1105+
1106+
const Self = @This();
1107+
1108+
fn deinit(self: *Self) void {
1109+
self.buffer.deinit(self.allocator);
1110+
self.* = undefined;
1111+
}
1112+
1113+
const ReadError = ReaderType.Error || Allocator.Error;
1114+
const Reader = std.io.Reader(*Self, ReadError, read);
1115+
1116+
fn read(self: *Self, buf: []u8) ReadError!usize {
1117+
if (self.read_start < self.buffer.items.len) {
1118+
// Previously buffered data is available and should be used
1119+
// before reading more data from the underlying reader.
1120+
const available = self.buffer.items.len - self.read_start;
1121+
const count = @min(available, buf.len);
1122+
@memcpy(buf[0..count], self.buffer.items[self.read_start..][0..count]);
1123+
self.read_start += count;
1124+
return count;
1125+
}
1126+
1127+
try self.buffer.ensureUnusedCapacity(self.allocator, buf.len);
1128+
const read_buffer = self.buffer.unusedCapacitySlice();
1129+
const count = try self.child_reader.read(read_buffer[0..buf.len]);
1130+
@memcpy(buf[0..count], read_buffer[0..count]);
1131+
self.buffer.items.len += count;
1132+
self.read_start += count;
1133+
return count;
1134+
}
1135+
1136+
fn reader(self: *Self) Reader {
1137+
return .{ .context = self };
1138+
}
1139+
1140+
/// Returns a slice of the buffered data that has already been read,
1141+
/// except the last `count_before_end` bytes.
1142+
fn readDataExcept(self: Self, count_before_end: usize) []const u8 {
1143+
assert(count_before_end <= self.read_start);
1144+
return self.buffer.items[0 .. self.read_start - count_before_end];
1145+
}
1146+
1147+
/// Discards the first `count` bytes of buffered data.
1148+
fn discard(self: *Self, count: usize) void {
1149+
assert(count <= self.buffer.items.len);
1150+
const retain = self.buffer.items.len - count;
1151+
mem.copyForwards(
1152+
u8,
1153+
self.buffer.items[0..retain],
1154+
self.buffer.items[count..][0..retain],
1155+
);
1156+
self.buffer.items.len = retain;
1157+
self.read_start -= @min(self.read_start, count);
1158+
}
1159+
1160+
/// Rewinds the read position to the beginning of buffered data.
1161+
fn rewind(self: *Self) void {
1162+
self.read_start = 0;
1163+
}
1164+
};
1165+
}
1166+
1167+
fn accumulatingReader(
1168+
allocator: Allocator,
1169+
reader: anytype,
1170+
) AccumulatingReader(@TypeOf(reader)) {
1171+
return .{ .child_reader = reader, .allocator = allocator };
1172+
}
1173+
10941174
/// Performs the first pass over the packfile data for index construction.
10951175
/// This will index all non-delta objects, queue delta objects for further
10961176
/// processing, and return the pack checksum (which is part of the index
@@ -1102,59 +1182,106 @@ fn indexPackFirstPass(
11021182
pending_deltas: *std.ArrayListUnmanaged(IndexEntry),
11031183
) ![Sha1.digest_length]u8 {
11041184
var pack_buffered_reader = std.io.bufferedReader(pack.reader());
1105-
var pack_counting_reader = std.io.countingReader(pack_buffered_reader.reader());
1106-
var pack_hashed_reader = std.compress.hashedReader(pack_counting_reader.reader(), Sha1.init(.{}));
1107-
const pack_reader = pack_hashed_reader.reader();
1185+
var pack_accumulating_reader = accumulatingReader(allocator, pack_buffered_reader.reader());
1186+
defer pack_accumulating_reader.deinit();
1187+
var pack_position: usize = 0;
1188+
var pack_hash = Sha1.init(.{});
1189+
const pack_reader = pack_accumulating_reader.reader();
11081190

11091191
const pack_header = try PackHeader.read(pack_reader);
1192+
const pack_header_bytes = pack_accumulating_reader.readDataExcept(0);
1193+
pack_position += pack_header_bytes.len;
1194+
pack_hash.update(pack_header_bytes);
1195+
pack_accumulating_reader.discard(pack_header_bytes.len);
11101196

11111197
var current_entry: u32 = 0;
11121198
while (current_entry < pack_header.total_objects) : (current_entry += 1) {
1113-
const entry_offset = pack_counting_reader.bytes_read;
1114-
var entry_crc32_reader = std.compress.hashedReader(pack_reader, std.hash.Crc32.init());
1115-
const entry_header = try EntryHeader.read(entry_crc32_reader.reader());
1199+
const entry_offset = pack_position;
1200+
var entry_crc32 = std.hash.Crc32.init();
1201+
1202+
const entry_header = try EntryHeader.read(pack_reader);
1203+
const entry_header_bytes = pack_accumulating_reader.readDataExcept(0);
1204+
pack_position += entry_header_bytes.len;
1205+
pack_hash.update(entry_header_bytes);
1206+
entry_crc32.update(entry_header_bytes);
1207+
pack_accumulating_reader.discard(entry_header_bytes.len);
1208+
11161209
switch (entry_header) {
11171210
inline .commit, .tree, .blob, .tag => |object, tag| {
1118-
var entry_decompress_stream = std.compress.zlib.decompressor(entry_crc32_reader.reader());
1119-
var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader());
1211+
var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader);
1212+
var entry_data_size: usize = 0;
11201213
var entry_hashed_writer = hashedWriter(std.io.null_writer, Sha1.init(.{}));
11211214
const entry_writer = entry_hashed_writer.writer();
1215+
11221216
// The object header is not included in the pack data but is
11231217
// part of the object's ID
11241218
try entry_writer.print("{s} {}\x00", .{ @tagName(tag), object.uncompressed_length });
1125-
var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init();
1126-
try fifo.pump(entry_counting_reader.reader(), entry_writer);
1127-
if (entry_counting_reader.bytes_read != object.uncompressed_length) {
1219+
1220+
while (try entry_decompress_stream.next()) |decompressed_data| {
1221+
entry_data_size += decompressed_data.len;
1222+
try entry_writer.writeAll(decompressed_data);
1223+
1224+
const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
1225+
pack_position += compressed_bytes.len;
1226+
pack_hash.update(compressed_bytes);
1227+
entry_crc32.update(compressed_bytes);
1228+
pack_accumulating_reader.discard(compressed_bytes.len);
1229+
}
1230+
const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
1231+
pack_position += footer_bytes.len;
1232+
pack_hash.update(footer_bytes);
1233+
entry_crc32.update(footer_bytes);
1234+
pack_accumulating_reader.discard(footer_bytes.len);
1235+
pack_accumulating_reader.rewind();
1236+
1237+
if (entry_data_size != object.uncompressed_length) {
11281238
return error.InvalidObject;
11291239
}
1240+
11301241
const oid = entry_hashed_writer.hasher.finalResult();
11311242
try index_entries.put(allocator, oid, .{
11321243
.offset = entry_offset,
1133-
.crc32 = entry_crc32_reader.hasher.final(),
1244+
.crc32 = entry_crc32.final(),
11341245
});
11351246
},
11361247
inline .ofs_delta, .ref_delta => |delta| {
1137-
var entry_decompress_stream = std.compress.zlib.decompressor(entry_crc32_reader.reader());
1138-
var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader());
1139-
var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init();
1140-
try fifo.pump(entry_counting_reader.reader(), std.io.null_writer);
1141-
if (entry_counting_reader.bytes_read != delta.uncompressed_length) {
1248+
var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader);
1249+
var entry_data_size: usize = 0;
1250+
1251+
while (try entry_decompress_stream.next()) |decompressed_data| {
1252+
entry_data_size += decompressed_data.len;
1253+
1254+
const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
1255+
pack_position += compressed_bytes.len;
1256+
pack_hash.update(compressed_bytes);
1257+
entry_crc32.update(compressed_bytes);
1258+
pack_accumulating_reader.discard(compressed_bytes.len);
1259+
}
1260+
const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
1261+
pack_position += footer_bytes.len;
1262+
pack_hash.update(footer_bytes);
1263+
entry_crc32.update(footer_bytes);
1264+
pack_accumulating_reader.discard(footer_bytes.len);
1265+
pack_accumulating_reader.rewind();
1266+
1267+
if (entry_data_size != delta.uncompressed_length) {
11421268
return error.InvalidObject;
11431269
}
1270+
11441271
try pending_deltas.append(allocator, .{
11451272
.offset = entry_offset,
1146-
.crc32 = entry_crc32_reader.hasher.final(),
1273+
.crc32 = entry_crc32.final(),
11471274
});
11481275
},
11491276
}
11501277
}
11511278

1152-
const pack_checksum = pack_hashed_reader.hasher.finalResult();
1153-
const recorded_checksum = try pack_buffered_reader.reader().readBytesNoEof(Sha1.digest_length);
1279+
const pack_checksum = pack_hash.finalResult();
1280+
const recorded_checksum = try pack_reader.readBytesNoEof(Sha1.digest_length);
11541281
if (!mem.eql(u8, &pack_checksum, &recorded_checksum)) {
11551282
return error.CorruptedPack;
11561283
}
1157-
_ = pack_buffered_reader.reader().readByte() catch |e| switch (e) {
1284+
_ = pack_reader.readByte() catch |e| switch (e) {
11581285
error.EndOfStream => return pack_checksum,
11591286
else => |other| return other,
11601287
};
@@ -1385,7 +1512,11 @@ test "packfile indexing and checkout" {
13851512
defer worktree.cleanup();
13861513

13871514
const commit_id = try parseOid("dd582c0720819ab7130b103635bd7271b9fd4feb");
1388-
try repository.checkout(worktree.dir, commit_id);
1515+
1516+
var diagnostics: Diagnostics = .{ .allocator = testing.allocator };
1517+
defer diagnostics.deinit();
1518+
try repository.checkout(worktree.dir, commit_id, &diagnostics);
1519+
try testing.expect(diagnostics.errors.items.len == 0);
13891520

13901521
const expected_files: []const []const u8 = &.{
13911522
"dir/file",

0 commit comments

Comments
 (0)