Skip to content

Commit 9adfc3a

Browse files
committed
[SPARK-39650][SS] Fix incorrect value schema in streaming deduplication with backward compatibility
### What changes were proposed in this pull request? This PR proposes to fix the incorrect value schema in streaming deduplication. It stores the empty row having a single column with null (using NullType), but the value schema is specified as all columns, which leads incorrect behavior from state store schema compatibility checker. This PR proposes to set the schema of value as `StructType(Array(StructField("__dummy__", NullType)))` to fit with the empty row. With this change, the streaming queries creating the checkpoint after this fix would work smoothly. To not break the existing streaming queries having incorrect value schema, this PR proposes to disable the check for value schema on streaming deduplication. Disabling the value check was there for the format validation (we have two different checkers for state store), but it has been missing for state store schema compatibility check. To avoid adding more config, this PR leverages the existing config "format validation" is using. ### Why are the changes needed? This is a bug fix. Suppose the streaming query below: ``` # df has the columns `a`, `b`, `c` val df = spark.readStream.format("...").load() val query = df.dropDuplicate("a").writeStream.format("...").start() ``` while the query is running, df can produce a different set of columns (e.g. `a`, `b`, `c`, `d`) from the same source due to schema evolution. Since we only deduplicate the rows with column `a`, the change of schema should not matter for streaming deduplication, but state store schema checker throws error saying "value schema is not compatible" before this fix. ### Does this PR introduce _any_ user-facing change? No, this is basically a bug fix which end users wouldn't notice unless they encountered a bug. ### How was this patch tested? New tests. Closes #37041 from HeartSaVioR/SPARK-39650. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]> (cherry picked from commit fe53603) Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 1387af7 commit 9adfc3a

File tree

37 files changed

+152
-22
lines changed

37 files changed

+152
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,34 @@ class StateSchemaCompatibilityChecker(
4141
fm.mkdirs(schemaFileLocation.getParent)
4242

4343
def check(keySchema: StructType, valueSchema: StructType): Unit = {
44+
check(keySchema, valueSchema, ignoreValueSchema = false)
45+
}
46+
47+
def check(keySchema: StructType, valueSchema: StructType, ignoreValueSchema: Boolean): Unit = {
4448
if (fm.exists(schemaFileLocation)) {
4549
logDebug(s"Schema file for provider $providerId exists. Comparing with provided schema.")
4650
val (storedKeySchema, storedValueSchema) = readSchemaFile()
47-
if (storedKeySchema.equals(keySchema) && storedValueSchema.equals(valueSchema)) {
51+
if (storedKeySchema.equals(keySchema) &&
52+
(ignoreValueSchema || storedValueSchema.equals(valueSchema))) {
4853
// schema is exactly same
4954
} else if (!schemasCompatible(storedKeySchema, keySchema) ||
50-
!schemasCompatible(storedValueSchema, valueSchema)) {
55+
(!ignoreValueSchema && !schemasCompatible(storedValueSchema, valueSchema))) {
56+
val errorMsgForKeySchema = s"- Provided key schema: $keySchema\n" +
57+
s"- Existing key schema: $storedKeySchema\n"
58+
59+
// If it is requested to skip checking the value schema, we also don't expose the value
60+
// schema information to the error message.
61+
val errorMsgForValueSchema = if (!ignoreValueSchema) {
62+
s"- Provided value schema: $valueSchema\n" +
63+
s"- Existing value schema: $storedValueSchema\n"
64+
} else {
65+
""
66+
}
5167
val errorMsg = "Provided schema doesn't match to the schema for existing state! " +
5268
"Please note that Spark allow difference of field name: check count of fields " +
5369
"and data type of each field.\n" +
54-
s"- Provided key schema: $keySchema\n" +
55-
s"- Provided value schema: $valueSchema\n" +
56-
s"- Existing key schema: $storedKeySchema\n" +
57-
s"- Existing value schema: $storedValueSchema\n" +
70+
errorMsgForKeySchema +
71+
errorMsgForValueSchema +
5872
s"If you want to force running query without schema validation, please set " +
5973
s"${SQLConf.STATE_SCHEMA_CHECK_ENABLED.key} to false.\n" +
6074
"Please note running query with incompatible schema could cause indeterministic" +

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,12 @@ object StateStore extends Logging {
511511
val checker = new StateSchemaCompatibilityChecker(storeProviderId, hadoopConf)
512512
// regardless of configuration, we check compatibility to at least write schema file
513513
// if necessary
514-
val ret = Try(checker.check(keySchema, valueSchema)).toEither.fold(Some(_), _ => None)
514+
// if the format validation for value schema is disabled, we also disable the schema
515+
// compatibility checker for value schema as well.
516+
val ret = Try(
517+
checker.check(keySchema, valueSchema,
518+
ignoreValueSchema = !storeConf.formatValidationCheckValue)
519+
).toEither.fold(Some(_), _ => None)
515520
if (storeConf.stateSchemaCheckEnabled) {
516521
ret
517522
} else {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,12 @@ class StateStoreConf(
4848
/** Whether validate the underlying format or not. */
4949
val formatValidationEnabled: Boolean = sqlConf.stateStoreFormatValidationEnabled
5050

51-
/** Whether validate the value format when the format invalidation enabled. */
51+
/**
52+
* Whether to validate the value side. This config is applied to both validators as below:
53+
*
54+
* - whether to validate the value format when the format validation is enabled.
55+
* - whether to validate the value schema when the state schema check is enabled.
56+
*/
5257
val formatValidationCheckValue: Boolean =
5358
extraOptions.getOrElse(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG, "true") == "true"
5459

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -750,13 +750,15 @@ case class StreamingDeduplicateExec(
750750
keyExpressions, getStateInfo, conf) :: Nil
751751
}
752752

753+
private val schemaForEmptyRow: StructType = StructType(Array(StructField("__dummy__", NullType)))
754+
753755
override protected def doExecute(): RDD[InternalRow] = {
754756
metrics // force lazy init at driver
755757

756758
child.execute().mapPartitionsWithStateStore(
757759
getStateInfo,
758760
keyExpressions.toStructType,
759-
child.output.toStructType,
761+
schemaForEmptyRow,
760762
numColsPrefixKey = 0,
761763
session.sessionState,
762764
Some(session.streams.stateStoreCoordinator),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"nextBatchWatermarkMs":0}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"nextBatchWatermarkMs":0}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"id":"33e8de33-00b8-4b60-8246-df2f433257ff"}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
v1
2+
{"batchWatermarkMs":0,"batchTimestampMs":1656644489789,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5"}}
3+
0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
v1
2+
{"batchWatermarkMs":0,"batchTimestampMs":1656644492462,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5"}}
3+
1

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,16 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
231231
assert((resultKeySchema, resultValueSchema) === (keySchema, valueSchema))
232232
}
233233

234+
test("SPARK-39650: ignore value schema on compatibility check") {
235+
val typeChangedValueSchema = StructType(valueSchema.map(_.copy(dataType = TimestampType)))
236+
verifySuccess(keySchema, valueSchema, keySchema, typeChangedValueSchema,
237+
ignoreValueSchema = true)
238+
239+
val typeChangedKeySchema = StructType(keySchema.map(_.copy(dataType = TimestampType)))
240+
verifyException(keySchema, valueSchema, typeChangedKeySchema, valueSchema,
241+
ignoreValueSchema = true)
242+
}
243+
234244
private def applyNewSchemaToNestedFieldInKey(newNestedSchema: StructType): StructType = {
235245
applyNewSchemaToNestedField(keySchema, newNestedSchema, "key3")
236246
}
@@ -257,44 +267,57 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
257267
dir: String,
258268
queryId: UUID,
259269
newKeySchema: StructType,
260-
newValueSchema: StructType): Unit = {
270+
newValueSchema: StructType,
271+
ignoreValueSchema: Boolean): Unit = {
261272
// in fact, Spark doesn't support online state schema change, so need to check
262273
// schema only once for each running of JVM
263274
val providerId = StateStoreProviderId(
264275
StateStoreId(dir, opId, partitionId), queryId)
265276

266277
new StateSchemaCompatibilityChecker(providerId, hadoopConf)
267-
.check(newKeySchema, newValueSchema)
278+
.check(newKeySchema, newValueSchema, ignoreValueSchema = ignoreValueSchema)
268279
}
269280

270281
private def verifyException(
271282
oldKeySchema: StructType,
272283
oldValueSchema: StructType,
273284
newKeySchema: StructType,
274-
newValueSchema: StructType): Unit = {
285+
newValueSchema: StructType,
286+
ignoreValueSchema: Boolean = false): Unit = {
275287
val dir = newDir()
276288
val queryId = UUID.randomUUID()
277-
runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema)
289+
runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema,
290+
ignoreValueSchema = ignoreValueSchema)
278291

279292
val e = intercept[StateSchemaNotCompatible] {
280-
runSchemaChecker(dir, queryId, newKeySchema, newValueSchema)
293+
runSchemaChecker(dir, queryId, newKeySchema, newValueSchema,
294+
ignoreValueSchema = ignoreValueSchema)
281295
}
282296

283-
e.getMessage.contains("Provided schema doesn't match to the schema for existing state!")
284-
e.getMessage.contains(newKeySchema.json)
285-
e.getMessage.contains(newValueSchema.json)
286-
e.getMessage.contains(oldKeySchema.json)
287-
e.getMessage.contains(oldValueSchema.json)
297+
assert(e.getMessage.contains("Provided schema doesn't match to the schema for existing state!"))
298+
assert(e.getMessage.contains(newKeySchema.toString()))
299+
assert(e.getMessage.contains(oldKeySchema.toString()))
300+
301+
if (ignoreValueSchema) {
302+
assert(!e.getMessage.contains(newValueSchema.toString()))
303+
assert(!e.getMessage.contains(oldValueSchema.toString()))
304+
} else {
305+
assert(e.getMessage.contains(newValueSchema.toString()))
306+
assert(e.getMessage.contains(oldValueSchema.toString()))
307+
}
288308
}
289309

290310
private def verifySuccess(
291311
oldKeySchema: StructType,
292312
oldValueSchema: StructType,
293313
newKeySchema: StructType,
294-
newValueSchema: StructType): Unit = {
314+
newValueSchema: StructType,
315+
ignoreValueSchema: Boolean = false): Unit = {
295316
val dir = newDir()
296317
val queryId = UUID.randomUUID()
297-
runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema)
298-
runSchemaChecker(dir, queryId, newKeySchema, newValueSchema)
318+
runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema,
319+
ignoreValueSchema = ignoreValueSchema)
320+
runSchemaChecker(dir, queryId, newKeySchema, newValueSchema,
321+
ignoreValueSchema = ignoreValueSchema)
299322
}
300323
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717

1818
package org.apache.spark.sql.streaming
1919

20+
import java.io.File
21+
22+
import org.apache.commons.io.FileUtils
23+
2024
import org.apache.spark.sql.DataFrame
2125
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
2226
import org.apache.spark.sql.execution.streaming.MemoryStream
2327
import org.apache.spark.sql.functions._
2428
import org.apache.spark.sql.internal.SQLConf
29+
import org.apache.spark.util.Utils
2530

2631
class StreamingDeduplicationSuite extends StateStoreMetricsTest {
2732

@@ -413,4 +418,69 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
413418
assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 1)
414419
)
415420
}
421+
422+
test("SPARK-39650: duplicate with specific keys should allow input to change schema") {
423+
withTempDir { checkpoint =>
424+
val dedupeInputData = MemoryStream[(String, Int)]
425+
val dedupe = dedupeInputData.toDS().dropDuplicates("_1")
426+
427+
testStream(dedupe, Append)(
428+
StartStream(checkpointLocation = checkpoint.getCanonicalPath),
429+
430+
AddData(dedupeInputData, "a" -> 1),
431+
CheckLastBatch("a" -> 1),
432+
433+
AddData(dedupeInputData, "a" -> 2, "b" -> 3),
434+
CheckLastBatch("b" -> 3)
435+
)
436+
437+
val dedupeInputData2 = MemoryStream[(String, Int, String)]
438+
val dedupe2 = dedupeInputData2.toDS().dropDuplicates("_1")
439+
440+
// initialize new memory stream with previously executed batches
441+
dedupeInputData2.addData(("a", 1, "dummy"))
442+
dedupeInputData2.addData(Seq(("a", 2, "dummy"), ("b", 3, "dummy")))
443+
444+
testStream(dedupe2, Append)(
445+
StartStream(checkpointLocation = checkpoint.getCanonicalPath),
446+
447+
AddData(dedupeInputData2, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")),
448+
CheckLastBatch(("c", 9, "c"))
449+
)
450+
}
451+
}
452+
453+
test("SPARK-39650: recovery from checkpoint having all columns as value schema") {
454+
// NOTE: We are also changing the schema of input compared to the checkpoint. In the checkpoint
455+
// we define the input schema as (String, Int).
456+
val inputData = MemoryStream[(String, Int, String)]
457+
val dedupe = inputData.toDS().dropDuplicates("_1")
458+
459+
// The fix will land after Spark 3.3.0, hence we can check backward compatibility with
460+
// checkpoint being built from Spark 3.3.0.
461+
val resourceUri = this.getClass.getResource(
462+
"/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/").toURI
463+
464+
val checkpointDir = Utils.createTempDir().getCanonicalFile
465+
// Copy the checkpoint to a temp dir to prevent changes to the original.
466+
// Not doing this will lead to the test passing on the first run, but fail subsequent runs.
467+
FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
468+
469+
inputData.addData(("a", 1, "dummy"))
470+
inputData.addData(("a", 2, "dummy"), ("b", 3, "dummy"))
471+
472+
testStream(dedupe, Append)(
473+
StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
474+
/*
475+
Note: The checkpoint was generated using the following input in Spark version 3.3.0
476+
AddData(inputData, ("a", 1)),
477+
CheckLastBatch(("a", 1)),
478+
AddData(inputData, ("a", 2), ("b", 3)),
479+
CheckLastBatch(("b", 3))
480+
*/
481+
482+
AddData(inputData, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")),
483+
CheckLastBatch(("c", 9, "c"))
484+
)
485+
}
416486
}

0 commit comments

Comments
 (0)