From ee8f43f3e3dfb780021c0ea44a83559a45419cf7 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Sun, 12 Apr 2020 16:38:46 +0200 Subject: [PATCH 1/2] zstd: Reduce decoder allocations significantly --- zstd/blockdec.go | 12 +++++++-- zstd/decoder_test.go | 62 ++++++++++++++++++++++++++++++++++++++++++++ zstd/framedec.go | 6 ++++- 3 files changed, 77 insertions(+), 3 deletions(-) diff --git a/zstd/blockdec.go b/zstd/blockdec.go index 63062ffa66..ed47b0004b 100644 --- a/zstd/blockdec.go +++ b/zstd/blockdec.go @@ -131,17 +131,25 @@ func (b *blockDec) reset(br byteBuffer, windowSize uint64) error { b.Type = blockType((bh >> 1) & 3) // find size. cSize := int(bh >> 3) + maxSize := maxBlockSize switch b.Type { case blockTypeReserved: return ErrReservedBlockType case blockTypeRLE: b.RLESize = uint32(cSize) + if b.lowMem { + maxSize = cSize + } cSize = 1 case blockTypeCompressed: if debug { println("Data size on stream:", cSize) } b.RLESize = 0 + maxSize = maxCompressedBlockSize + if windowSize < maxCompressedBlockSize && b.lowMem { + maxSize = int(windowSize) + } if cSize > maxCompressedBlockSize || uint64(cSize) > b.WindowSize { if debug { printf("compressed block too big: csize:%d block: %+v\n", uint64(cSize), b) @@ -160,8 +168,8 @@ func (b *blockDec) reset(br byteBuffer, windowSize uint64) error { b.dataStorage = make([]byte, 0, maxBlockSize) } } - if cap(b.dst) <= maxBlockSize { - b.dst = make([]byte, 0, maxBlockSize+1) + if cap(b.dst) <= maxSize { + b.dst = make([]byte, 0, maxSize+1) } var err error b.data, err = br.readBig(cSize, b.dataStorage) diff --git a/zstd/decoder_test.go b/zstd/decoder_test.go index 7a095ac56c..7e886d3715 100644 --- a/zstd/decoder_test.go +++ b/zstd/decoder_test.go @@ -149,6 +149,68 @@ func TestNewDecoder(t *testing.T) { testDecoderDecodeAll(t, "testdata/decoder.zip", dec) } +func TestNewDecoderMemory(t *testing.T) { + defer timeout(60 * time.Second)() + var testdata bytes.Buffer + enc, err := NewWriter(&testdata, WithWindowSize(64<<10), WithSingleSegment(false)) + if err != nil { + t.Fatal(err) + } + // Write 256KB + for i := 0; i < 256; i++ { + tmp := strings.Repeat(string([]byte{byte(i)}), 1024) + _, err := enc.Write([]byte(tmp)) + if err != nil { + t.Fatal(err) + } + } + err = enc.Close() + if err != nil { + t.Fatal(err) + } + + var n = 5000 + if testing.Short() { + n = 200 + } + + var before, after runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&before) + + var decs = make([]*Decoder, n) + for i := range decs { + // Wrap in NopCloser to avoid shortcut. + input := ioutil.NopCloser(bytes.NewBuffer(testdata.Bytes())) + decs[i], err = NewReader(input, WithDecoderConcurrency(1), WithDecoderLowmem(true)) + if err != nil { + t.Fatal(err) + } + } + + // 32K buffer + var tmp [128 << 10]byte + for i := range decs { + _, err := io.ReadFull(decs[i], tmp[:]) + if err != nil { + t.Fatal(err) + } + } + + runtime.GC() + runtime.ReadMemStats(&after) + size := (after.HeapInuse - before.HeapInuse) / uint64(n) / 1024 + t.Log(size, "KiB per decoder") + // This is not exact science, but fail if we suddenly get more than 2x what we expect. + if size > 221*2 && !testing.Short() { + t.Errorf("expected < 221KB per decoder, got %d", size) + } + + for _, dec := range decs { + dec.Close() + } +} + func TestNewDecoderGood(t *testing.T) { defer timeout(30 * time.Second)() testDecoderFile(t, "testdata/good.zip") diff --git a/zstd/framedec.go b/zstd/framedec.go index e38f34a9b4..2326744fde 100644 --- a/zstd/framedec.go +++ b/zstd/framedec.go @@ -233,7 +233,11 @@ func (d *frameDec) reset(br byteBuffer) error { return ErrWindowSizeTooSmall } d.history.windowSize = int(d.WindowSize) - d.history.maxSize = d.history.windowSize + maxBlockSize + if d.o.lowMem && d.history.windowSize < maxBlockSize { + d.history.maxSize = d.history.windowSize * 2 + } else { + d.history.maxSize = d.history.windowSize + maxBlockSize + } // history contains input - maybe we do something d.rawInput = br return nil From 3ad1c099ec3a16b498cce0881e4c8ae3be7503f2 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Sun, 12 Apr 2020 18:32:52 +0200 Subject: [PATCH 2/2] More strict window size check. --- zstd/blockdec.go | 7 +++++-- zstd/framedec.go | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/zstd/blockdec.go b/zstd/blockdec.go index ed47b0004b..c2f855e75b 100644 --- a/zstd/blockdec.go +++ b/zstd/blockdec.go @@ -687,8 +687,11 @@ func (b *blockDec) decodeCompressed(hist *history) error { println("initializing sequences:", err) return err } - - err = seqs.decode(nSeqs, br, hist.b) + hbytes := hist.b + if len(hbytes) > hist.windowSize { + hbytes = hbytes[len(hbytes)-hist.windowSize:] + } + err = seqs.decode(nSeqs, br, hbytes) if err != nil { return err } diff --git a/zstd/framedec.go b/zstd/framedec.go index 2326744fde..780880ebe4 100644 --- a/zstd/framedec.go +++ b/zstd/framedec.go @@ -324,8 +324,8 @@ func (d *frameDec) checkCRC() error { func (d *frameDec) initAsync() { if !d.o.lowMem && !d.SingleSegment { - // set max extra size history to 20MB. - d.history.maxSize = d.history.windowSize + maxBlockSize*10 + // set max extra size history to 10MB. + d.history.maxSize = d.history.windowSize + maxBlockSize*5 } // re-alloc if more than one extra block size. if d.o.lowMem && cap(d.history.b) > d.history.maxSize+maxBlockSize {