Skip to content

Commit 4e7c09c

Browse files
committed
draft(ipc/writer): try to add compression to file writer
1 parent 0487c44 commit 4e7c09c

File tree

2 files changed

+29
-7
lines changed

2 files changed

+29
-7
lines changed

src/ipc/writer.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,8 +283,8 @@ export class RecordBatchWriter<T extends TypeMap = any> extends ReadableInterop<
283283
._writeBodyBuffers(buffers);
284284
}
285285

286-
protected _assembleRecordBatch(batch: RecordBatch<T>) {
287-
let { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(batch);
286+
protected _assembleRecordBatch<T extends Vector | RecordBatch>(...args: (T | T[])[]) {
287+
let { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(...args);
288288
if (this._compression != null) {
289289
({ byteLength, bufferRegions, buffers } = this._compressBodyBuffers(buffers));
290290
}
@@ -337,7 +337,7 @@ export class RecordBatchWriter<T extends TypeMap = any> extends ReadableInterop<
337337
}
338338

339339
protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta = false) {
340-
const { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(new Vector([dictionary]));
340+
const { byteLength, nodes, bufferRegions, buffers } = this._assembleRecordBatch(new Vector([dictionary]));
341341
const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions, this._compression);
342342
const dictionaryBatch = new metadata.DictionaryBatch(recordBatch, id, isDelta);
343343
const message = Message.from(dictionaryBatch, byteLength);

test/unit/ipc/writer/file-writer-tests.ts

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
import { RecordBatchStreamWriterOptions } from 'apache-arrow/ipc/writer.js';
1819
import {
1920
generateDictionaryTables, generateRandomTables
2021
} from '../../../data/tables.js';
@@ -23,6 +24,7 @@ import { validateRecordBatchIterator } from '../validate.js';
2324

2425
import {
2526
builderThroughIterable,
27+
CompressionType,
2628
Dictionary,
2729
Int32,
2830
RecordBatch,
@@ -32,6 +34,8 @@ import {
3234
Uint32,
3335
Vector
3436
} from 'apache-arrow';
37+
import { Codec, compressionRegistry } from 'apache-arrow/ipc/compression/registry';
38+
import * as lz4js from 'lz4js';
3539

3640
describe('RecordBatchFileWriter', () => {
3741
for (const table of generateRandomTables([10, 20, 30])) {
@@ -41,6 +45,24 @@ describe('RecordBatchFileWriter', () => {
4145
testFileWriter(table, `${table.schema.fields[0]}`);
4246
}
4347

48+
const compressionTypes = [CompressionType.LZ4_FRAME/*, CompressionType.ZSTD*/];
49+
50+
const lz4Codec: Codec = {
51+
encode(data: Uint8Array): Uint8Array {
52+
return lz4js.compress(data);
53+
},
54+
decode(data: Uint8Array): Uint8Array {
55+
return lz4js.decompress(data);
56+
}
57+
};
58+
compressionRegistry.set(CompressionType.LZ4_FRAME, lz4Codec);
59+
60+
const table = generate.table([10, 20, 30]).table;
61+
for (const compressionType of compressionTypes) {
62+
const testName = `[${table.schema.fields.join(', ')}] - ${CompressionType[compressionType]} compressed`;
63+
testFileWriter(table, testName, { compressionType });
64+
}
65+
4466
it('should throw if attempting to write replacement dictionary batches', async () => {
4567
const type = new Dictionary<Uint32, Int32>(new Uint32, new Int32, 0);
4668
const writer = new RecordBatchFileWriter();
@@ -91,14 +113,14 @@ describe('RecordBatchFileWriter', () => {
91113
});
92114
});
93115

94-
function testFileWriter(table: Table, name: string) {
116+
function testFileWriter(table: Table, name: string, options?: RecordBatchStreamWriterOptions) {
95117
describe(`should write the Arrow IPC file format (${name})`, () => {
96-
test(`Table`, validateTable.bind(0, table));
118+
test(`Table`, validateTable.bind(0, table, options));
97119
});
98120
}
99121

100-
async function validateTable(source: Table) {
101-
const writer = RecordBatchFileWriter.writeAll(source);
122+
async function validateTable(source: Table, options?: RecordBatchStreamWriterOptions) {
123+
const writer = RecordBatchFileWriter.writeAll(source, options);
102124
const result = new Table(RecordBatchReader.from(await writer.toUint8Array()));
103125
validateRecordBatchIterator(3, source.batches);
104126
expect(result).toEqualTable(source);

0 commit comments

Comments
 (0)