Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.

Conversation

ruoyu90
Copy link
Contributor

@ruoyu90 ruoyu90 commented Jun 5, 2020

This RFC will be open for comment until Thursday, June 18, 2020.

TFX Advanced DSL Semantics

Status Proposed
RFC # 253
Author(s) Ruoyu Li ([email protected]), Konstantin Shtoyk ([email protected]), Mitch Trott ([email protected]), Zhitao Li ([email protected])
Sponsor Konstantinos Katsiapis ([email protected])
Updated 2020-06-04

Goal

The existing TFX DSL mainly focuses on one-shot pipelines with static execution
plan. While it is good enough for many use cases, there are some scenarios that
the current DSL fails to support. Some of those scenarios are becoming more and
more crucial to modern ML production pipelines. This document will go
through some scenarios and proposes async pipeline semantics, to address these requirements.

The proposal seeks to address the following scenarios:

  • Components with different schedules
  • Different input sources
  • 'Best effort' components
  • Limited computation resource, too much data
  • Synchronization barrier

| Status | Proposed |
| :------------ | :---------------------------------------------------------- |
| **Author(s)** | Ruoyu Liu ([email protected]), Konstantin Shtoyk |
: : ([email protected]), Mitch Trott ([email protected]), Zhitao :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, GitHub's markdown rendering doesn't seem to support multi-line row in the table using ':'. Rendering is broken.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ucdmkt !

categorize ML pipelines. Understanding these concepts is critical to evaluate
the rest of this RFC.

#### Synchronous execution & Asynchronous execution
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not totally clear to me how this will work with orchestrators like Airflow and KubeFlow. Do these natively support asynchronous execution? Or is the idea that Synchronous pipelines should use an orchestrator, and Async pipelines are managed as a cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the question @cweill ! There are multiple ways to run an Async execution pipeline:

  • A pub-sub system will work
  • A centralized 'orchestrator' / 'controller' will also work
  • A collection of long-running jobs in a cluster will also work

The goal of this RFC is to propose the semantics and does not intend to propose / recommend an implementation pattern.

It's not totally clear to me how this will work with orchestrators like Airflow and KubeFlow

These two platforms are good for synchronous execution pipelines but might not be the best platforms to run asynchronous execution pipelines. As the two patterns (sync vs async) are quite different, we expect that people use different platforms for them.

Hope this answers your question :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, is there any platform you have in mind regarding async scheduler? Beam/Dataflow?

Main worry with Airflow/Kubeflow is UI (as you could potentially schedule sensor tasks to run every 5 minutes and check wether they need to run or not). But UI will be difficult to decipher since the plarforms do not visualize well dependencies across scheduled runs (at least Airflow, Kubeflow I don't know as much).

Also wondering, it this may be solved w a unified TFX UI that does support those (just thinking aloud)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most simple way to run asynchronous execution pipelines will be just deploying it to a cluster through a controller. No specific platform like Airflow or Kubeflow is needed.
Pub-sub systems are natural fits for this style of pipelines on the other hand. Beam / Dataflow is definitely another option using its support for unbounded source.

Also wondering, it this may be solved w a unified TFX UI that does support those (just thinking aloud)

A unified UI is definitely something we always keep in mind and is in our list. @zhitaoli

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea to provide some sort of orchestrator/runner for this on the long run? (aka a custom controller that can be deployed on GCP for async pipelines?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the long run we will provide something (an orchestrator or a controller) that can run asynchronous execution pipelines in cloud / OSS environment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, kinda related to this topic. Airflow community is talking to introduce a similar concept (which would be great as that would mean that Airflow could implement the Async pipeline execution). Thought, I would bring it up here. Curious on your thoughts on this. Note the current state is dicussion so it's still under the process of deciding what the implementation will look like. https://lists.apache.org/thread.html/re1a7e5cfcb1e9f4a0bfac41998da2d88ffb26d4f597036c772a4c86e%40%3Cdev.airflow.apache.org%3E

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruoyu90 Can we forward this RFC to the Airflow community and seek for comment too? I imagine there is a possibility to implement the async style semantic on top of the future Airflow implementation in the future.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most simple way to run asynchronous execution pipelines will be just deploying it to a cluster through a controller.

Do you mean deploying the pipeline components as a collection of standalone always running nodes?

Copy link
Contributor Author

@ruoyu90 ruoyu90 Jun 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@paveldournov yes exactly.

/cc @hongye-sun
/cc @jkim1014

@theadactyl theadactyl added the RFC: Proposed RFC Design Document label Jun 5, 2020
@theadactyl theadactyl changed the title RFC for advanced TFX DSL semantics [RFC]TFX Advanced DSL semantics Jun 5, 2020
@theadactyl theadactyl changed the title [RFC]TFX Advanced DSL semantics RFC: TFX Advanced DSL semantics Jun 5, 2020
categorize ML pipelines. Understanding these concepts is critical to evaluate
the rest of this RFC.

#### Synchronous execution & Asynchronous execution
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, is there any platform you have in mind regarding async scheduler? Beam/Dataflow?

Main worry with Airflow/Kubeflow is UI (as you could potentially schedule sensor tasks to run every 5 minutes and check wether they need to run or not). But UI will be difficult to decipher since the plarforms do not visualize well dependencies across scheduled runs (at least Airflow, Kubeflow I don't know as much).

Also wondering, it this may be solved w a unified TFX UI that does support those (just thinking aloud)

original pipelines is broken which might result in unintended behaviors such as
consuming partially ready data. Thus, it is important to be able to author a
pipeline that consists of components with different schedules and we want such
experience to be well defined and intuitive.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An additional comment here: Note that sometimes input data may be "delayed" from its scheduled time of arrival (due to issues on the collection pipeline or issues on data replication if working across several DCs). So not only it needs to support async pipeline components w different schedules, but sometimes even "Sensor" components that run whenever X data is fully there.

I think this is explained later on as async data is explain, but may be worth adding to background for clarity?

This is mostly for ExampleGen components.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I describe the use case you mentioned as 'need a way to define trigger condition of a component'?

/cc @1025KB

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. I would more specifically say 'need a way to trigger components depending on external artifact availability/status' (similar to how components are triggered internally when upstream artifact is ready).

Aka, I want to trigger my an exampleGen whenever the external artifacts that serve as inputs are ready and complete. And this may happen outside of a defined schedule (usually adapts to a schedule but may be delayed).

CC @newtonle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're thinking of standardizing a 'trigger policy' concept to model scenarios like that. This will also be useful for future features like conditional.

Comment on lines +285 to +286
3. Each node inside the sub-pipeline can be configured to run with synchronous
data mode or asynchronous data mode.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about this. Do you refer to resolver nodes here? Or something deeper?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. 'asynchronous data mode' in synchronous execution pipelines is achieved through resolver nodes.

Comment on lines +376 to +378
b = tfx.experimental.SubpipelineInputs(
inputs={'examples': eg.outputs['examples']},
async_inputs={'embedding': eb.outputs['embedding']})
Copy link
Member

@casassg casassg Jun 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if the input/output config could be wrapped into a decorator for the subpipeline function instead? May make it more readable. (maybe decorator for a future proposal as it may also be worth doing a similar thing for pipelines?)

Something like this:

@subpipeline
def create_subpipeline(subpipeline: SubPipeline, examples: SyncInput[Examples], embeddings: AsyncInput[Embeddings]) -> SubpipelineOutput(async=['model'], sync=['model', 'validation_result']):

  tx = tfx.Transform(
      examples=examples)
  tr = tfx.Trainer(
      examples=examples,
      embedding=embeddings,
      transform_graph=tx.outputs['transform_graph'])
  iv = tfx.InfraValidator(model=tr.outputs['model'])
   
  # This could be avoided if we used automatic component registration 
  subpipeline.add_components([tx, tr, iv])

  return {'model': tr.outputs['model'], 'validation_result': iv.outputs['validation_result']}

eg = tfx.ExampleGen(...)          # Irrelevant parts omitted
eb = tfx.EmbeddingGenerator(...)  # Irrelevant parts omitted
# Function signature modified by decorator?
sp = create_subpipeline(eg.outputs['example'], eb.outputs['embedding'])
p = tfx.Pusher(
    model=sp.outputs['model'],
    validation_result=sp.outputs['validation_result'])
lt = tfx.TFLiteConverter(model=sp.async_outputs['model'])
return pipeline.Pipeline(
    components=[eg, eb, sp, p, lt], execution_mode=ASYNC)

Mostly an idea, just leaving here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @casassg ! This is definitely an improvement on usability. Leaving it open as an AI on us.

/cc @charlesccychen
/cc @zhitaoli

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to the suggested usability improvement. Leaving this to @charlesccychen to also acknowedge.

@ruoyu90
Copy link
Contributor Author

ruoyu90 commented Jun 9, 2020

@jkim1014

Copy link
Contributor

@zhitaoli zhitaoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ematejska
Copy link

This has been accepted since there are no unresolved comments.

Copy link

@ematejska ematejska left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been accepted.

@ematejska ematejska merged commit cd9164b into tensorflow:master Jun 22, 2020
@ematejska ematejska added RFC: Accepted RFC Design Document: Accepted by Review and removed RFC: Proposed RFC Design Document labels Jun 22, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
cla: yes RFC: Accepted RFC Design Document: Accepted by Review
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants