Skip to content

Limitations of ETL Pipelines and Design Proposals for Improvement #1219

@youngmoneee

Description

@youngmoneee

Hi, guys ✋

The ETL pipeline refers to the process of extracting data from various sources, transforming it into a consistent format, and loading it into a destination. This entire pipeline process can be represented through a Stream.

However, the current pipeline is handled using List, which presents the following limitations:

  1. Using List requires loading all data into memory before processing. As the data volume increases, this can lead to memory insufficiency issues.
  2. Similar to the first point, since data is processed synchronously after being loaded into memory, this approach can be inefficient in terms of processing speed.
  3. Since List processes data in bulk only after all data is loaded, it is not suitable for handling unbounded sequence data, such as Web Scraping (Web scraping ETL #752) or real-time Log Data processing.

To address these issues, I propose transitioning the data pipeline from List to Flux.

Current Behavior

  1. DocumentReader loads a List<Document>.
  2. DocumentTransformer processes the transformations.
  3. DocumentWriter stores the transformed data.

etl-pipeline

Expected Behavior

  1. DocumentReader will change from Supplier<List<Document>> to Supplier<Flux<Document>>.
  2. Transformer<T, U> extends Function<Flux<T>, Flux<U>> interface that processes Flux objects.
  3. DocumentTransformer will change from Function<List<Document>, List<Document>> to Transformer<Document, Document>.
  4. DocumentWriter will change from Consumer<List<Document>> to Transformer<Document, Document> extends Writable.
  • While I also considered using a Consumer object similar to the original interface, I think this approach is more suitable for handling situations where writing to multiple destinations (File, VectorStore, DB, etc.) is required in the pipeline.(I would like to discuss this further.)

  • Ultimately, I think we can encapsulate the flow of this pipeline through an ETLPipeLine object that extends Runnable.

The example usage after the changes is as follows.

class ExamplePipeLine implements Runnable {
  private final Flux<Document> stream;
  ExamplePipeLine(DocumentReader r, DocumentTransformer t, DocumentWriter w) {
    this.stream = // assemble stream;
  }

  @Override
  public void run() {
    this.stream
    // ...and Error, Retry, etc. Handlers
    .subscribe();
  }
}

Benefits of the Proposed Design

This Interface change allows for controlling the data flow and enables more flexible implementation using lambdas or method references. Additionally, by transitioning to Flux, we can significantly improve memory efficiency, as data is processed in a streaming fashion without the need to load all data into memory at once.
This approach also enhances performance by enabling asynchronous processing, allowing the pipeline to handle larger datasets and real-time data streams more effectively.

Anticipated Challenges and Solutions

Interface Changes: While the ultimate goal is to achieve a more flexible and testable codebase through interface changes, modifications to existing implementations will be required.

Example Wrapper Classes:

// DocumentReader Wrapper
Supplier<Flux<Document>> DocumentReaderWrapper() {
 DocumentReader documentReader = new DocumentReader(...);
 return () -> Flux.fromIterable(documentReader.read());
}

By directly implementing the wrapping of existing synchronous implementations or applying AOP, this approach can be applied with minimal conflict.

And,
Please let me know if there are any issues I might have overlooked or if you have any better ideas(!) for this proposal.

Thanks 🧑🏼‍💻

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions