Skip to content

Commit a48fade

Browse files
committed
[Spark] Add assertion to verify CDC commit atomicity in streaming
Add a deltaAssert to ensure CDC commits are processed atomically in streaming - either all files from a commit are admitted or none at all. This prevents update_preimage and update_postimage rows from being split across separate batches, which would break CDC semantics. Add ProcessAllAvailable() call to prevent race conditions in schema change tests.
1 parent 185c226 commit a48fade

File tree

2 files changed

+22
-8
lines changed

2 files changed

+22
-8
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
142142
// This is to avoid returning `update_preimage` and `update_postimage` in separate
143143
// batches.
144144
if (admissionControl.admit(filteredFiles)) {
145+
deltaAssert(
146+
check = filteredFiles.isEmpty || cdcFiles.size == filteredFiles.size,
147+
name = "streaming.cdc.processWholeCommitOnly",
148+
msg = s"CDC files were filtered from ${cdcFiles.size} to ${filteredFiles.size}. " +
149+
"This violates the all-or-nothing guarantee for CDC commits.")
145150
filteredFiles.toIterator
146151
} else {
147152
Iterator()

spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,13 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
4444
import testImplicits._
4545
import io.delta.implicits._
4646

47+
/**
48+
* Returns the appropriate DeltaConfig
49+
*/
50+
protected def cdcConfig: DeltaConfig[Boolean] = DeltaConfigs.CHANGE_DATA_FEED
51+
4752
override protected def sparkConf: SparkConf = super.sparkConf
48-
.set(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true")
53+
.set(cdcConfig.defaultTablePropertyKey, "true")
4954

5055
/**
5156
* Create two tests for maxFilesPerTrigger and maxBytesPerTrigger
@@ -64,7 +69,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
6469

6570
testQuietly("no startingVersion should result fetch the entire snapshot") {
6671
withTempDir { inputDir =>
67-
withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "false") {
72+
withSQLConf(cdcConfig.defaultTablePropertyKey -> "false") {
6873
// version 0
6974
Seq(1, 9).toDF("value").write.format("delta").save(inputDir.getAbsolutePath)
7075

@@ -79,30 +84,31 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
7984
}
8085
// enable cdc - version 3
8186
sql(s"ALTER TABLE delta.`${inputDir.getAbsolutePath}` SET TBLPROPERTIES " +
82-
s"(${DeltaConfigs.CHANGE_DATA_FEED.key}=true)")
87+
s"(${cdcConfig.key}=true)")
8388

8489
val df = spark.readStream
8590
.option(DeltaOptions.CDC_READ_OPTION, "true")
8691
.format("delta")
8792
.load(inputDir.getCanonicalPath)
8893
.drop(CDCReader.CDC_COMMIT_TIMESTAMP)
8994

95+
val version = 3
9096
val deltaTable = io.delta.tables.DeltaTable.forPath(inputDir.getAbsolutePath)
9197
testStream(df) (
9298
ProcessAllAvailable(),
93-
CheckAnswer((1, "insert", 3), (2, "insert", 3)),
99+
CheckAnswer((1, "insert", version), (2, "insert", version)),
94100
Execute { _ =>
95101
deltaTable.delete("value = 1") // version 4
96102
},
97103
ProcessAllAvailable(),
98-
CheckAnswer((1, "insert", 3), (2, "insert", 3), (1, "delete", 4))
104+
CheckAnswer((1, "insert", version), (2, "insert", version), (1, "delete", version + 1))
99105
)
100106
}
101107
}
102108

103109
testQuietly("CDC initial snapshot should end at base index of next version") {
104110
withTempDir { inputDir =>
105-
withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true") {
111+
withSQLConf(cdcConfig.defaultTablePropertyKey -> "true") {
106112
// version 0
107113
Seq(5, 6).toDF("value").write.format("delta").save(inputDir.getAbsolutePath)
108114

@@ -384,7 +390,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
384390

385391
test("cdc streams with noop merge") {
386392
withSQLConf(
387-
DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true"
393+
cdcConfig.defaultTablePropertyKey -> "true"
388394
) {
389395
withTempDirs { (srcDir, targetDir, checkpointDir) =>
390396
// write source table
@@ -441,7 +447,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
441447
Seq(true, false).foreach { readChangeFeed =>
442448
test(s"streams updating latest offset with readChangeFeed=$readChangeFeed") {
443449
withTempDirs { (inputDir, checkpointDir, outputDir) =>
444-
withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true") {
450+
withSQLConf(cdcConfig.defaultTablePropertyKey -> "true") {
445451

446452
sql(s"CREATE TABLE delta.`$inputDir` (id BIGINT, value STRING) USING DELTA")
447453
// save some rows to input table.
@@ -946,6 +952,9 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
946952
withMetadata(deltaLog, StructType.fromDDL("value int"))
947953
true
948954
},
955+
// Force processing of stream to prevent race condition between DeltaSource.getBatch and
956+
// DeltaSource.checkReadIncompatibleSchemaChanges
957+
ProcessAllAvailable(),
949958
AssertOnQuery { _ =>
950959
withMetadata(deltaLog, StructType.fromDDL("id int, value string"))
951960
true

0 commit comments

Comments
 (0)