Skip to content

Writer Design #34

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ZENOTME opened this issue Aug 18, 2023 · 19 comments
Closed

Writer Design #34

ZENOTME opened this issue Aug 18, 2023 · 19 comments

Comments

@ZENOTME
Copy link
Contributor

ZENOTME commented Aug 18, 2023

This issue propose the writer design to solve:

arrow: Writing unpartitioned data into iceberg from arrow record batches
arrow: Writing partitioned data into iceberg from arrow record batches

And the design is based on what we do in icelake and inspire by java iceberg, feel free to any suggestion:

Class Design

SpecificFormatWriter

At the bottom level, we have kinds of specific format writer, which responsible for writing record batch into a file of specific format, such as:

struct ParquetWriter {
    ...
}

struct AvroWriter {
    ...
}

struct OrcWriter {
    ...
}

/// Implement this trait for above writer
trait SpecificWriter {
    fn write(batch: &RecordBatch) -> Result<()>
}

1. Disscusion: Which format we prepare to support in v0.2. I guess only parquet?

DataFileWriter

A higher level of writer is the data writer, data writer use the SpecificWriter and it will split the record batch into multiple file according the config such as file_size_limit, it looks like:

struct DataFileWriter {
    current_specific_writer: SpecificWriter
}

2. Disscusion: how do we treat the type SpecificWriter, use enum to dispatch or use generic parameter.

ParititionWriter and UnparitionWriter

The top level is PartitionWriter and UnpartitionWriter. For UnpartitionWriter, it is just simlar to the DataFileWriter. For ParitionWriter, it need to split the record batch into different group according partition. And these record batch will be wrote using DataWriter responsible for different partition. It looks like:

struct PartitionWriter {
    HashMap<Partition,DataFileWriter>    
} 
@liurenjie1024
Copy link
Contributor

Thanks @ZENOTME for raising this discussion.

For SpecificWriter, it's similar to FileAppender in java. Since we don't have much format I would suggest to use enum rather than a trait.

Others LGTM.

@JanKaul
Copy link
Collaborator

JanKaul commented Aug 18, 2023

I think making the writing process async would be of great value because uploading the data to an object store is not CPU bound.

For parquet there is actually a great AsyncArrowWriter.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Aug 18, 2023

For parquet there is actually a great AsyncArrowWriter.

Good suggestion. I investigate metadata return by the parquet writer and find that this metadata can fullfill most of datafile need.
Except following field:

  1. file_size_in_bytes
    Metadata only record every row group size. But the file size also include the metadata size. So we may can't compute by it.
    The solution we use in icelake is to wrap the io writer with tracker.
  2. nan_value_counts
    If I understand correct, nan value only occur in the type like float, double. So this messge may need we to track it from record batch manully.
  3. split_offsets

describe in spec. : Split offsets for the data file. For example, all row group offsets in a Parquet file. Must be sorted ascending

The metadata only record the offset of the column chunk. I'm not sure can we use the first column chunk offset as the row group offset.

@JanKaul
Copy link
Collaborator

JanKaul commented Aug 18, 2023

I'm not an expert in parquet but maybe we can calculate the file_size_in_bytes from each rowgroup size.

And the split_offsets should also be related to the rowgroup size.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Aug 18, 2023

I'm not an expert in parquet but maybe we can calculate the file_size_in_bytes from each rowgroup size.

According the parquet format, a parquet file consist of:

4-byte magic number "PAR1" +
multilple row group + 
file meta data + 
4-byte length in bytes of file metadata + 
4-byte magic number "PAR1" 

So I think accumulate each rowgroup size can't work.🤔

@JanKaul
Copy link
Collaborator

JanKaul commented Aug 18, 2023

So the missing piece is the size of the metadata, right?

It's a bit weird approach but we could write the FileMetadata to a buffer and check it's length. Like here

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Aug 18, 2023

It's a bit weird approach but we could write the FileMetadata to a buffer and check it's length. Like here

It's bit weird but more efficient than track wrapper I think.🤣

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Aug 19, 2023

We can expose a similiar trait as FileIO and keep opendal underhood as an implementation.

I think the next thing is to decide the FileIo interface and we can work on our SpecificFormatWriter.(Or call it FileWriter, FileAppender).

Here is some info may useful for FileIo interface design:

  1. For parquet writer, what we need to pass in is a AsynWriter.

  2. For avro writer, it only support sync writer.
    In icelake, to support async write, what we do is write it to Vec first and then write this Vec<u8> using another async write method in opendal::Operator.

And as suggestion, interface may need to have the following function look like:

trait FileIo {
    fn writer() -> impl FileWriter
}

trait FileWriter {
    // This interface can be used by parquet writer. 
    fn aysnc_writer() -> impl AsynWriter;
    
    // These interfaces can be used by avro writer.
    async fn write();
    async fn close();
}

cc @Xuanwo

@Xuanwo
Copy link
Member

Xuanwo commented Aug 19, 2023

Most of the writing tasks are handled by @liurenjie1024 and @ZENOTME, so I may not have sufficient experience on this particular topic. However, I will do my best to provide input from the OpenDAL perspective to assist with the design.


When we talking about writer, we are talking about two different things: One for the IO writer, one for the Value writer.

  • IO Writer handles the underlying IO operations, a.k.a, write(&[u8]) into storage.
  • Value Writer handles the upper value operations, a.k.a, write(record_batches).

For IO Writer, there are three kinds of APIs we can use:

  • PutObject: write all content at once. a.k.a: op.write(path, bs) or op.writer_with(path).content_length(size)
  • MultipartUpload: allow write content in multipart, we will need to buffer some content in-memory, and write to storage. a.k.a: op.writer(path).
  • AppendObject: allow append upon object (few storage support this). a.k.a: op.writer(path).append(true).

PutObject

  • Good: fast and cheap, only one API call
  • Bad:
    • need all content inside memory, not subiable for large content.
    • need to know the actual size before uploading.

MultipartUpload

  • Good: require at most 8MiB in memory.
  • Bad: need (size/8MiB) + 2 API Call. slow, and more expensive.

AppendObject

  • Good: require at most 8MiB in memory, no extra API call.
  • Bad: Only few service support this, and AWS S3 doesn't support.

I suggest that:

  1. we can support two ways of IO Writer: PutObject and MultipartUpload first. Both of them can be used as AsyncWriter via opendal::Writer. This IO writer should be injected in to SpecificWriter so they and handile those logic internally.
  2. We should not handle them differently based on format. Maybe we can add a internal buffer inside avro writer so that it can accept the same writer as parquet writer does?

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Aug 19, 2023

Thanks for your suggestion!

We should not handle them differently based on format. Maybe we can add a internal buffer inside avro writer so that it can accept the same writer as parquet writer does?

Agree. We can use the internal buffer so that the avro writer can write into first. And then we can use the AsyncWriter to write via https://docs.rs/tokio/1.29.1/tokio/io/trait.AsyncWriteExt.html.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Aug 19, 2023

More detail SpecificFormatWriter interface design

I think there is enough info to have a SpecificFormatWriter interface design. (May be we can call it FileWriter). If following interface looks well, I will introduce a PR to make it work first after we have DataFile.

enum WriterConfig {
    Parquet(ParquetWriterConfig),
    Avro(AvroWriterConfig)
}

enum FileWriter {
    Parquet(ParquetFileWriter)
    Avro(AvroWriter)
}

impl FileWriter {
    /// Create the writer according to config
    fn try_new(writer: impl AsynWriter, schema: ArrowSchemaRef, config:WriterConfig) -> Self;

    async fn write(record_batach) -> Result<()>;

    async fn close() -> Result<DataFile>
}

@JanKaul
Copy link
Collaborator

JanKaul commented Aug 20, 2023

Thanks @Xuanwo for the write up. For me it would be Okay to just start with MultipartUpload as it is the most general option and works with all memory requirements and file sizes. We could then later add the optimization to use PutObject for small files.

And I strongly agree with your point on using the same interface for the parquet and the avro writer. Even when it means we need to implement an async writer with an internal buffer for avro. Looking at the parquet asyncwriter implementation, it doesn't look too bad.

@ZENOTME the interface looks great to me. Thanks for the effort.

@liurenjie1024
Copy link
Contributor

liurenjie1024 commented Aug 21, 2023

Thanks everyone for this nice discussion.

For parquet file metadata, I've submitted a pr to fix it, so we will have file_size_in_bytes and split_offset in parquet file metadata.

About metrics such as nan_values, I think we can follow java's approach: https://github.com/apache/iceberg/blob/aa1c1ef49eb0cd969c18676495983f5b6a231a5c/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java#L140

In iceberg java's design, there are several components:

  1. FileIO

It provides several methods for manipulating files in underlying storage, such as creating a new file for writing, deleting a file. I think we can provide similar data structure, which can be implemented as a wrapper of underlying library(opendal, etc).

impl FileIO {
  fn new_file_writer(&self, path: &str) -> impl Writer;
  fn new_async_file_writer(&self, path: &str) -> impl AynscWriter;
}
  1. FileAppender

A file appender focus on one file, and the format of this file. We can have different file appender for different formats, such as parquet, orc, avro.

  1. FileWriter

A FileWriter focuses on content of a file, for example, we can have data file writer, pos deletion file writer, equation deletion file writer. Usually a file writer cares about one partition data.

  1. TaskWriter

A task writer is used by a task in distributed computing framework, such as spark, flink. A task writer takes care of assigning input data into different partitions, and calls FileWriter to append data. Also for deletions it should call deletion file writer.

I think above design is quite elegant, and I would suggest similar components as it.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Aug 21, 2023

I find that the parquet writer will cast the type automatelly, which means that following code can work:

// A writer with schema {a: timestamp with time zone}
let schema = Schema::new(Fields::from(vec![Field::new("a", DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond,Some("+08:00".into())), false)])).into();
let w = op.writer("test").await?;
let mut pw = ParquetWriterBuilder::new(w, schema).build()?;

// We can insert it using i64 array.
let col = Arc::new(Int64Array::from_iter_values(vec![1])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("a", col)]).unwrap();
pw.write(&to_write).await?;

// We can insert it using f32 array.
let col = Arc::new(Float32Array::from_iter_values(vec![1])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("a", col)]).unwrap();
pw.write(&to_write).await?;

The schema of writer is timestamp with time zone, but we can insert into it using int64, float array . And the cast logic is to cast the physical representation directly rather than logical cast. In above example, the physical representation of timestamp is i64, so it just cast i64, f32 into i64.

I'm not sure this behaviour will cause potential bug in future.

So I want to discuss:

  1. Should we support schema safety check? (Personally I think we should)
  2. If we want to support, how strict it should? Should we do the auto cast sometimes? BTW, if we want to do this, we shouldn't let parquet writer do it. We should do it using logical cast manually in our writer.
    e.g. the schema of table is following. And the input record is timestamp without time zone.
table {
  t: timestamp with time zone
}

@liurenjie1024
Copy link
Contributor

liurenjie1024 commented Aug 22, 2023

Thanks @ZENOTME 's interesting finding, I even didn't notice the behavior before. The schema of parquet writer should be determined by table schema, e.g. the schema of iceberg. Also the input record batch's schema should match the table's schema. The question of whether we should do runtime check during writing? I think it should be configurable since it have huge performance impact for the writer. It would be useful during debugging, developing phase, but should be turned off in production.

@lukekim
Copy link

lukekim commented Feb 13, 2024

@ZENOTME are you intending on implementing writer support until completion yourself?

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Feb 13, 2024

@ZENOTME are you intending on implementing writer support until completion yourself?

I have implemented the writer support in icelake so I'm glad to migrate them into iceberg-rust. I'm glad to take most of the work but it's not necessary to be completed myself. I'm glad to see any good suggestions and contributions.
For now, I have drafted an interface design on #135 and I hope it can be a start point to discuss how to design the interface. After the writer interface design is determined, I will migrate the writer in icelake to iceberg-rust but it's also ok to open these as the issue for other contributors.

@lukekim
Copy link

lukekim commented Feb 23, 2024

@ZENOTME got it, thanks. We may be able to help here if you create some discrete sub-issues for the conversion.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Apr 23, 2024

We have finished the init writer framework! We can close this issue now and the next step is to implement more writers. I will create the issues and track them separately.

@ZENOTME ZENOTME closed this as completed Apr 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants