diff --git a/lib/std/compress/flate/inflate.zig b/lib/std/compress/flate/inflate.zig index ec8c3a408283..5e286d2af396 100644 --- a/lib/std/compress/flate/inflate.zig +++ b/lib/std/compress/flate/inflate.zig @@ -288,6 +288,14 @@ pub fn Inflate(comptime container: Container, comptime ReaderType: type) type { } } + /// Returns the number of bytes that have been read from the internal + /// reader but not yet consumed by the decompressor. + pub fn unreadBytes(self: Self) usize { + // There can be no error here: the denominator is not zero, and + // overflow is not possible since the type is unsigned. + return std.math.divCeil(usize, self.bits.nbits, 8) catch unreachable; + } + // Iterator interface /// Can be used in iterator like loop without memcpy to another buffer: diff --git a/src/Package/Fetch/git.zig b/src/Package/Fetch/git.zig index abbb03194825..db50ddfab7c1 100644 --- a/src/Package/Fetch/git.zig +++ b/src/Package/Fetch/git.zig @@ -1091,6 +1091,86 @@ pub fn indexPack(allocator: Allocator, pack: std.fs.File, index_writer: anytype) try index_writer.writeAll(&index_checksum); } +/// A reader that stores read data in a growable internal buffer. The read +/// position can be rewound to allow previously read data to be read again. +fn AccumulatingReader(comptime ReaderType: type) type { + return struct { + child_reader: ReaderType, + buffer: std.ArrayListUnmanaged(u8) = .{}, + /// The position in `buffer` from which reads should start, returning + /// buffered data. If this is `buffer.items.len`, data will be read from + /// `child_reader` instead. + read_start: usize = 0, + allocator: Allocator, + + const Self = @This(); + + fn deinit(self: *Self) void { + self.buffer.deinit(self.allocator); + self.* = undefined; + } + + const ReadError = ReaderType.Error || Allocator.Error; + const Reader = std.io.Reader(*Self, ReadError, read); + + fn read(self: *Self, buf: []u8) ReadError!usize { + if (self.read_start < self.buffer.items.len) { + // Previously buffered data is available and should be used + // before reading more data from the underlying reader. + const available = self.buffer.items.len - self.read_start; + const count = @min(available, buf.len); + @memcpy(buf[0..count], self.buffer.items[self.read_start..][0..count]); + self.read_start += count; + return count; + } + + try self.buffer.ensureUnusedCapacity(self.allocator, buf.len); + const read_buffer = self.buffer.unusedCapacitySlice(); + const count = try self.child_reader.read(read_buffer[0..buf.len]); + @memcpy(buf[0..count], read_buffer[0..count]); + self.buffer.items.len += count; + self.read_start += count; + return count; + } + + fn reader(self: *Self) Reader { + return .{ .context = self }; + } + + /// Returns a slice of the buffered data that has already been read, + /// except the last `count_before_end` bytes. + fn readDataExcept(self: Self, count_before_end: usize) []const u8 { + assert(count_before_end <= self.read_start); + return self.buffer.items[0 .. self.read_start - count_before_end]; + } + + /// Discards the first `count` bytes of buffered data. + fn discard(self: *Self, count: usize) void { + assert(count <= self.buffer.items.len); + const retain = self.buffer.items.len - count; + mem.copyForwards( + u8, + self.buffer.items[0..retain], + self.buffer.items[count..][0..retain], + ); + self.buffer.items.len = retain; + self.read_start -= @min(self.read_start, count); + } + + /// Rewinds the read position to the beginning of buffered data. + fn rewind(self: *Self) void { + self.read_start = 0; + } + }; +} + +fn accumulatingReader( + allocator: Allocator, + reader: anytype, +) AccumulatingReader(@TypeOf(reader)) { + return .{ .child_reader = reader, .allocator = allocator }; +} + /// Performs the first pass over the packfile data for index construction. /// This will index all non-delta objects, queue delta objects for further /// processing, and return the pack checksum (which is part of the index @@ -1102,59 +1182,106 @@ fn indexPackFirstPass( pending_deltas: *std.ArrayListUnmanaged(IndexEntry), ) ![Sha1.digest_length]u8 { var pack_buffered_reader = std.io.bufferedReader(pack.reader()); - var pack_counting_reader = std.io.countingReader(pack_buffered_reader.reader()); - var pack_hashed_reader = std.compress.hashedReader(pack_counting_reader.reader(), Sha1.init(.{})); - const pack_reader = pack_hashed_reader.reader(); + var pack_accumulating_reader = accumulatingReader(allocator, pack_buffered_reader.reader()); + defer pack_accumulating_reader.deinit(); + var pack_position: usize = 0; + var pack_hash = Sha1.init(.{}); + const pack_reader = pack_accumulating_reader.reader(); const pack_header = try PackHeader.read(pack_reader); + const pack_header_bytes = pack_accumulating_reader.readDataExcept(0); + pack_position += pack_header_bytes.len; + pack_hash.update(pack_header_bytes); + pack_accumulating_reader.discard(pack_header_bytes.len); var current_entry: u32 = 0; while (current_entry < pack_header.total_objects) : (current_entry += 1) { - const entry_offset = pack_counting_reader.bytes_read; - var entry_crc32_reader = std.compress.hashedReader(pack_reader, std.hash.Crc32.init()); - const entry_header = try EntryHeader.read(entry_crc32_reader.reader()); + const entry_offset = pack_position; + var entry_crc32 = std.hash.Crc32.init(); + + const entry_header = try EntryHeader.read(pack_reader); + const entry_header_bytes = pack_accumulating_reader.readDataExcept(0); + pack_position += entry_header_bytes.len; + pack_hash.update(entry_header_bytes); + entry_crc32.update(entry_header_bytes); + pack_accumulating_reader.discard(entry_header_bytes.len); + switch (entry_header) { - inline .commit, .tree, .blob, .tag => |object, tag| { - var entry_decompress_stream = std.compress.zlib.decompressor(entry_crc32_reader.reader()); - var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader()); + .commit, .tree, .blob, .tag => |object| { + var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader); + var entry_data_size: usize = 0; var entry_hashed_writer = hashedWriter(std.io.null_writer, Sha1.init(.{})); const entry_writer = entry_hashed_writer.writer(); + // The object header is not included in the pack data but is // part of the object's ID - try entry_writer.print("{s} {}\x00", .{ @tagName(tag), object.uncompressed_length }); - var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init(); - try fifo.pump(entry_counting_reader.reader(), entry_writer); - if (entry_counting_reader.bytes_read != object.uncompressed_length) { + try entry_writer.print("{s} {}\x00", .{ @tagName(entry_header), object.uncompressed_length }); + + while (try entry_decompress_stream.next()) |decompressed_data| { + entry_data_size += decompressed_data.len; + try entry_writer.writeAll(decompressed_data); + + const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes()); + pack_position += compressed_bytes.len; + pack_hash.update(compressed_bytes); + entry_crc32.update(compressed_bytes); + pack_accumulating_reader.discard(compressed_bytes.len); + } + const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes()); + pack_position += footer_bytes.len; + pack_hash.update(footer_bytes); + entry_crc32.update(footer_bytes); + pack_accumulating_reader.discard(footer_bytes.len); + pack_accumulating_reader.rewind(); + + if (entry_data_size != object.uncompressed_length) { return error.InvalidObject; } + const oid = entry_hashed_writer.hasher.finalResult(); try index_entries.put(allocator, oid, .{ .offset = entry_offset, - .crc32 = entry_crc32_reader.hasher.final(), + .crc32 = entry_crc32.final(), }); }, inline .ofs_delta, .ref_delta => |delta| { - var entry_decompress_stream = std.compress.zlib.decompressor(entry_crc32_reader.reader()); - var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader()); - var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init(); - try fifo.pump(entry_counting_reader.reader(), std.io.null_writer); - if (entry_counting_reader.bytes_read != delta.uncompressed_length) { + var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader); + var entry_data_size: usize = 0; + + while (try entry_decompress_stream.next()) |decompressed_data| { + entry_data_size += decompressed_data.len; + + const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes()); + pack_position += compressed_bytes.len; + pack_hash.update(compressed_bytes); + entry_crc32.update(compressed_bytes); + pack_accumulating_reader.discard(compressed_bytes.len); + } + const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes()); + pack_position += footer_bytes.len; + pack_hash.update(footer_bytes); + entry_crc32.update(footer_bytes); + pack_accumulating_reader.discard(footer_bytes.len); + pack_accumulating_reader.rewind(); + + if (entry_data_size != delta.uncompressed_length) { return error.InvalidObject; } + try pending_deltas.append(allocator, .{ .offset = entry_offset, - .crc32 = entry_crc32_reader.hasher.final(), + .crc32 = entry_crc32.final(), }); }, } } - const pack_checksum = pack_hashed_reader.hasher.finalResult(); - const recorded_checksum = try pack_buffered_reader.reader().readBytesNoEof(Sha1.digest_length); + const pack_checksum = pack_hash.finalResult(); + const recorded_checksum = try pack_reader.readBytesNoEof(Sha1.digest_length); if (!mem.eql(u8, &pack_checksum, &recorded_checksum)) { return error.CorruptedPack; } - _ = pack_buffered_reader.reader().readByte() catch |e| switch (e) { + _ = pack_reader.readByte() catch |e| switch (e) { error.EndOfStream => return pack_checksum, else => |other| return other, }; @@ -1385,7 +1512,11 @@ test "packfile indexing and checkout" { defer worktree.cleanup(); const commit_id = try parseOid("dd582c0720819ab7130b103635bd7271b9fd4feb"); - try repository.checkout(worktree.dir, commit_id); + + var diagnostics: Diagnostics = .{ .allocator = testing.allocator }; + defer diagnostics.deinit(); + try repository.checkout(worktree.dir, commit_id, &diagnostics); + try testing.expect(diagnostics.errors.items.len == 0); const expected_files: []const []const u8 = &.{ "dir/file",