Skip to content

[SPARK-39650][SS] Fix incorrect value schema in streaming deduplication with backward compatibility #37041

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to fix the incorrect value schema in streaming deduplication. It stores the empty row having a single column with null (using NullType), but the value schema is specified as all columns, which leads incorrect behavior from state store schema compatibility checker.

This PR proposes to set the schema of value as StructType(Array(StructField("__dummy__", NullType))) to fit with the empty row. With this change, the streaming queries creating the checkpoint after this fix would work smoothly.

To not break the existing streaming queries having incorrect value schema, this PR proposes to disable the check for value schema on streaming deduplication. Disabling the value check was there for the format validation (we have two different checkers for state store), but it has been missing for state store schema compatibility check. To avoid adding more config, this PR leverages the existing config "format validation" is using.

Why are the changes needed?

This is a bug fix. Suppose the streaming query below:

# df has the columns `a`, `b`, `c`
val df = spark.readStream.format("...").load()
val query = df.dropDuplicate("a").writeStream.format("...").start()

while the query is running, df can produce a different set of columns (e.g. a, b, c, d) from the same source due to schema evolution. Since we only deduplicate the rows with column a, the change of schema should not matter for streaming deduplication, but state store schema checker throws error saying "value schema is not compatible" before this fix.

Does this PR introduce any user-facing change?

No, this is basically a bug fix which end users wouldn't notice unless they encountered a bug.

How was this patch tested?

New tests.

@HeartSaVioR
Copy link
Contributor Author

cc. @zsxwing @viirya

@HeartSaVioR
Copy link
Contributor Author

This PR does not deal with overwriting incorrect value schema file. If we want to leverage the schema file for understanding/reading state, ideally we should make the schema file be up to date. But we don't also overwrite the schema file when the schema is compatible.

We can track the effort as separate JIRA.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one minor comment

@@ -515,7 +515,12 @@ object StateStore extends Logging {
val checker = new StateSchemaCompatibilityChecker(storeProviderId, hadoopConf)
// regardless of configuration, we check compatibility to at least write schema file
// if necessary
val ret = Try(checker.check(keySchema, valueSchema)).toEither.fold(Some(_), _ => None)
// if the format validation for value schema is disabled, we also disable the schema
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add a code comment at formatValidationCheckValue in StateStoreConf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I'll leave a comment that the config is in effect for both checkers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya
Copy link
Member

viirya commented Jul 1, 2022

This PR does not deal with overwriting incorrect value schema file. If we want to leverage the schema file for understanding/reading state, ideally we should make the schema file be up to date. But we don't also overwrite the schema file when the schema is compatible.

Is that important? The state value is already not matched with the value schema, isn't? Not sure if that the schema is up to date is important.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Jul 1, 2022

It's not important with current features of Structured Streaming. It's something like "future-proof" - when we plan to build a feature like reading the state (actually I even had a PR which was forgotten...), keeping the schema up-to-date will give the ideal UX to the end users, otherwise they would see the outdated schema, or even fail to read the state.

@HeartSaVioR
Copy link
Contributor Author

missed to cc. @xuanyuanking as he authored the other checker.

@HeartSaVioR
Copy link
Contributor Author

I'll keep this PR around a day to seek for further reviews. If there is no outstanding one, I'll merge this in tomorrow.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Jul 2, 2022

Thanks! Merging to master/3.3/3.2.

HeartSaVioR added a commit that referenced this pull request Jul 2, 2022
…on with backward compatibility

### What changes were proposed in this pull request?

This PR proposes to fix the incorrect value schema in streaming deduplication. It stores the empty row having a single column with null (using NullType), but the value schema is specified as all columns, which leads incorrect behavior from state store schema compatibility checker.

This PR proposes to set the schema of value as `StructType(Array(StructField("__dummy__", NullType)))` to fit with the empty row. With this change, the streaming queries creating the checkpoint after this fix would work smoothly.

To not break the existing streaming queries having incorrect value schema, this PR proposes to disable the check for value schema on streaming deduplication. Disabling the value check was there for the format validation (we have two different checkers for state store), but it has been missing for state store schema compatibility check. To avoid adding more config, this PR leverages the existing config "format validation" is using.

### Why are the changes needed?

This is a bug fix. Suppose the streaming query below:

```
# df has the columns `a`, `b`, `c`
val df = spark.readStream.format("...").load()
val query = df.dropDuplicate("a").writeStream.format("...").start()
```

while the query is running, df can produce a different set of columns (e.g. `a`, `b`, `c`, `d`) from the same source due to schema evolution. Since we only deduplicate the rows with column `a`, the change of schema should not matter for streaming deduplication, but state store schema checker throws error saying "value schema is not compatible" before this fix.

### Does this PR introduce _any_ user-facing change?

No, this is basically a bug fix which end users wouldn't notice unless they encountered a bug.

### How was this patch tested?

New tests.

Closes #37041 from HeartSaVioR/SPARK-39650.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit fe53603)
Signed-off-by: Jungtaek Lim <[email protected]>
HeartSaVioR added a commit that referenced this pull request Jul 2, 2022
…on with backward compatibility

### What changes were proposed in this pull request?

This PR proposes to fix the incorrect value schema in streaming deduplication. It stores the empty row having a single column with null (using NullType), but the value schema is specified as all columns, which leads incorrect behavior from state store schema compatibility checker.

This PR proposes to set the schema of value as `StructType(Array(StructField("__dummy__", NullType)))` to fit with the empty row. With this change, the streaming queries creating the checkpoint after this fix would work smoothly.

To not break the existing streaming queries having incorrect value schema, this PR proposes to disable the check for value schema on streaming deduplication. Disabling the value check was there for the format validation (we have two different checkers for state store), but it has been missing for state store schema compatibility check. To avoid adding more config, this PR leverages the existing config "format validation" is using.

### Why are the changes needed?

This is a bug fix. Suppose the streaming query below:

```
# df has the columns `a`, `b`, `c`
val df = spark.readStream.format("...").load()
val query = df.dropDuplicate("a").writeStream.format("...").start()
```

while the query is running, df can produce a different set of columns (e.g. `a`, `b`, `c`, `d`) from the same source due to schema evolution. Since we only deduplicate the rows with column `a`, the change of schema should not matter for streaming deduplication, but state store schema checker throws error saying "value schema is not compatible" before this fix.

### Does this PR introduce _any_ user-facing change?

No, this is basically a bug fix which end users wouldn't notice unless they encountered a bug.

### How was this patch tested?

New tests.

Closes #37041 from HeartSaVioR/SPARK-39650.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit fe53603)
Signed-off-by: Jungtaek Lim <[email protected]>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…on with backward compatibility

### What changes were proposed in this pull request?

This PR proposes to fix the incorrect value schema in streaming deduplication. It stores the empty row having a single column with null (using NullType), but the value schema is specified as all columns, which leads incorrect behavior from state store schema compatibility checker.

This PR proposes to set the schema of value as `StructType(Array(StructField("__dummy__", NullType)))` to fit with the empty row. With this change, the streaming queries creating the checkpoint after this fix would work smoothly.

To not break the existing streaming queries having incorrect value schema, this PR proposes to disable the check for value schema on streaming deduplication. Disabling the value check was there for the format validation (we have two different checkers for state store), but it has been missing for state store schema compatibility check. To avoid adding more config, this PR leverages the existing config "format validation" is using.

### Why are the changes needed?

This is a bug fix. Suppose the streaming query below:

```
# df has the columns `a`, `b`, `c`
val df = spark.readStream.format("...").load()
val query = df.dropDuplicate("a").writeStream.format("...").start()
```

while the query is running, df can produce a different set of columns (e.g. `a`, `b`, `c`, `d`) from the same source due to schema evolution. Since we only deduplicate the rows with column `a`, the change of schema should not matter for streaming deduplication, but state store schema checker throws error saying "value schema is not compatible" before this fix.

### Does this PR introduce _any_ user-facing change?

No, this is basically a bug fix which end users wouldn't notice unless they encountered a bug.

### How was this patch tested?

New tests.

Closes apache#37041 from HeartSaVioR/SPARK-39650.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit fe53603)
Signed-off-by: Jungtaek Lim <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants