Skip to content

Package.Fetch: fix Git package fetching #18992

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/std/compress/flate/inflate.zig
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,14 @@ pub fn Inflate(comptime container: Container, comptime ReaderType: type) type {
}
}

/// Returns the number of bytes that have been read from the internal
/// reader but not yet consumed by the decompressor.
pub fn unreadBytes(self: Self) usize {
// There can be no error here: the denominator is not zero, and
// overflow is not possible since the type is unsigned.
return std.math.divCeil(usize, self.bits.nbits, 8) catch unreachable;
}

// Iterator interface

/// Can be used in iterator like loop without memcpy to another buffer:
Expand Down
179 changes: 155 additions & 24 deletions src/Package/Fetch/git.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,86 @@ pub fn indexPack(allocator: Allocator, pack: std.fs.File, index_writer: anytype)
try index_writer.writeAll(&index_checksum);
}

/// A reader that stores read data in a growable internal buffer. The read
/// position can be rewound to allow previously read data to be read again.
fn AccumulatingReader(comptime ReaderType: type) type {
return struct {
child_reader: ReaderType,
buffer: std.ArrayListUnmanaged(u8) = .{},
/// The position in `buffer` from which reads should start, returning
/// buffered data. If this is `buffer.items.len`, data will be read from
/// `child_reader` instead.
read_start: usize = 0,
allocator: Allocator,

const Self = @This();

fn deinit(self: *Self) void {
self.buffer.deinit(self.allocator);
self.* = undefined;
}

const ReadError = ReaderType.Error || Allocator.Error;
const Reader = std.io.Reader(*Self, ReadError, read);

fn read(self: *Self, buf: []u8) ReadError!usize {
if (self.read_start < self.buffer.items.len) {
// Previously buffered data is available and should be used
// before reading more data from the underlying reader.
const available = self.buffer.items.len - self.read_start;
const count = @min(available, buf.len);
@memcpy(buf[0..count], self.buffer.items[self.read_start..][0..count]);
self.read_start += count;
return count;
}

try self.buffer.ensureUnusedCapacity(self.allocator, buf.len);
const read_buffer = self.buffer.unusedCapacitySlice();
const count = try self.child_reader.read(read_buffer[0..buf.len]);
@memcpy(buf[0..count], read_buffer[0..count]);
self.buffer.items.len += count;
self.read_start += count;
return count;
}

fn reader(self: *Self) Reader {
return .{ .context = self };
}

/// Returns a slice of the buffered data that has already been read,
/// except the last `count_before_end` bytes.
fn readDataExcept(self: Self, count_before_end: usize) []const u8 {
assert(count_before_end <= self.read_start);
return self.buffer.items[0 .. self.read_start - count_before_end];
}

/// Discards the first `count` bytes of buffered data.
fn discard(self: *Self, count: usize) void {
assert(count <= self.buffer.items.len);
const retain = self.buffer.items.len - count;
mem.copyForwards(
u8,
self.buffer.items[0..retain],
self.buffer.items[count..][0..retain],
);
self.buffer.items.len = retain;
self.read_start -= @min(self.read_start, count);
}

/// Rewinds the read position to the beginning of buffered data.
fn rewind(self: *Self) void {
self.read_start = 0;
}
};
}

fn accumulatingReader(
allocator: Allocator,
reader: anytype,
) AccumulatingReader(@TypeOf(reader)) {
return .{ .child_reader = reader, .allocator = allocator };
}

/// Performs the first pass over the packfile data for index construction.
/// This will index all non-delta objects, queue delta objects for further
/// processing, and return the pack checksum (which is part of the index
Expand All @@ -1102,59 +1182,106 @@ fn indexPackFirstPass(
pending_deltas: *std.ArrayListUnmanaged(IndexEntry),
) ![Sha1.digest_length]u8 {
var pack_buffered_reader = std.io.bufferedReader(pack.reader());
var pack_counting_reader = std.io.countingReader(pack_buffered_reader.reader());
var pack_hashed_reader = std.compress.hashedReader(pack_counting_reader.reader(), Sha1.init(.{}));
const pack_reader = pack_hashed_reader.reader();
var pack_accumulating_reader = accumulatingReader(allocator, pack_buffered_reader.reader());
defer pack_accumulating_reader.deinit();
var pack_position: usize = 0;
var pack_hash = Sha1.init(.{});
const pack_reader = pack_accumulating_reader.reader();

const pack_header = try PackHeader.read(pack_reader);
const pack_header_bytes = pack_accumulating_reader.readDataExcept(0);
pack_position += pack_header_bytes.len;
pack_hash.update(pack_header_bytes);
pack_accumulating_reader.discard(pack_header_bytes.len);

var current_entry: u32 = 0;
while (current_entry < pack_header.total_objects) : (current_entry += 1) {
const entry_offset = pack_counting_reader.bytes_read;
var entry_crc32_reader = std.compress.hashedReader(pack_reader, std.hash.Crc32.init());
const entry_header = try EntryHeader.read(entry_crc32_reader.reader());
const entry_offset = pack_position;
var entry_crc32 = std.hash.Crc32.init();

const entry_header = try EntryHeader.read(pack_reader);
const entry_header_bytes = pack_accumulating_reader.readDataExcept(0);
pack_position += entry_header_bytes.len;
pack_hash.update(entry_header_bytes);
entry_crc32.update(entry_header_bytes);
pack_accumulating_reader.discard(entry_header_bytes.len);

switch (entry_header) {
inline .commit, .tree, .blob, .tag => |object, tag| {
var entry_decompress_stream = std.compress.zlib.decompressor(entry_crc32_reader.reader());
var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader());
.commit, .tree, .blob, .tag => |object| {
var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader);
var entry_data_size: usize = 0;
var entry_hashed_writer = hashedWriter(std.io.null_writer, Sha1.init(.{}));
const entry_writer = entry_hashed_writer.writer();

// The object header is not included in the pack data but is
// part of the object's ID
try entry_writer.print("{s} {}\x00", .{ @tagName(tag), object.uncompressed_length });
var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init();
try fifo.pump(entry_counting_reader.reader(), entry_writer);
if (entry_counting_reader.bytes_read != object.uncompressed_length) {
try entry_writer.print("{s} {}\x00", .{ @tagName(entry_header), object.uncompressed_length });

while (try entry_decompress_stream.next()) |decompressed_data| {
entry_data_size += decompressed_data.len;
try entry_writer.writeAll(decompressed_data);

const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
pack_position += compressed_bytes.len;
pack_hash.update(compressed_bytes);
entry_crc32.update(compressed_bytes);
pack_accumulating_reader.discard(compressed_bytes.len);
}
const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
pack_position += footer_bytes.len;
pack_hash.update(footer_bytes);
entry_crc32.update(footer_bytes);
pack_accumulating_reader.discard(footer_bytes.len);
pack_accumulating_reader.rewind();

if (entry_data_size != object.uncompressed_length) {
return error.InvalidObject;
}

const oid = entry_hashed_writer.hasher.finalResult();
try index_entries.put(allocator, oid, .{
.offset = entry_offset,
.crc32 = entry_crc32_reader.hasher.final(),
.crc32 = entry_crc32.final(),
});
},
inline .ofs_delta, .ref_delta => |delta| {
var entry_decompress_stream = std.compress.zlib.decompressor(entry_crc32_reader.reader());
var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader());
var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init();
try fifo.pump(entry_counting_reader.reader(), std.io.null_writer);
if (entry_counting_reader.bytes_read != delta.uncompressed_length) {
var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader);
var entry_data_size: usize = 0;

while (try entry_decompress_stream.next()) |decompressed_data| {
entry_data_size += decompressed_data.len;

const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
pack_position += compressed_bytes.len;
pack_hash.update(compressed_bytes);
entry_crc32.update(compressed_bytes);
pack_accumulating_reader.discard(compressed_bytes.len);
}
const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
pack_position += footer_bytes.len;
pack_hash.update(footer_bytes);
entry_crc32.update(footer_bytes);
pack_accumulating_reader.discard(footer_bytes.len);
pack_accumulating_reader.rewind();

if (entry_data_size != delta.uncompressed_length) {
return error.InvalidObject;
}

try pending_deltas.append(allocator, .{
.offset = entry_offset,
.crc32 = entry_crc32_reader.hasher.final(),
.crc32 = entry_crc32.final(),
});
},
}
}

const pack_checksum = pack_hashed_reader.hasher.finalResult();
const recorded_checksum = try pack_buffered_reader.reader().readBytesNoEof(Sha1.digest_length);
const pack_checksum = pack_hash.finalResult();
const recorded_checksum = try pack_reader.readBytesNoEof(Sha1.digest_length);
if (!mem.eql(u8, &pack_checksum, &recorded_checksum)) {
return error.CorruptedPack;
}
_ = pack_buffered_reader.reader().readByte() catch |e| switch (e) {
_ = pack_reader.readByte() catch |e| switch (e) {
error.EndOfStream => return pack_checksum,
else => |other| return other,
};
Expand Down Expand Up @@ -1385,7 +1512,11 @@ test "packfile indexing and checkout" {
defer worktree.cleanup();

const commit_id = try parseOid("dd582c0720819ab7130b103635bd7271b9fd4feb");
try repository.checkout(worktree.dir, commit_id);

var diagnostics: Diagnostics = .{ .allocator = testing.allocator };
defer diagnostics.deinit();
try repository.checkout(worktree.dir, commit_id, &diagnostics);
try testing.expect(diagnostics.errors.items.len == 0);

const expected_files: []const []const u8 = &.{
"dir/file",
Expand Down