Skip to content

Commit 7007e1c

Browse files
Dylan WongHyukjinKwon
authored andcommitted
[SPARK-53345][SS][TESTS] Use withTempDir for consistent directory across restarts in streaming test
### What changes were proposed in this pull request? Use withTempDir for consistent directory across restarts in test. ### Why are the changes needed? Test was flaky due to using an unpredictable temp directory for a streaming query that tests against restarts. This test was introduced in SPARK-53069. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test only change. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52088 from dylanwong250/SPARK-53345. Authored-by: Dylan Wong <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent f0e8999 commit 7007e1c

File tree

1 file changed

+34
-29
lines changed

1 file changed

+34
-29
lines changed

sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -750,42 +750,47 @@ class TransformWithListStateTTLSuite extends TransformWithStateTTLTest
750750
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
751751
classOf[RocksDBStateStoreProvider].getName,
752752
SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
753-
val ttlConfig = TTLConfig(ttlDuration = Duration.ofMinutes(10))
754-
val inputStream = MemoryStream[String]
755-
val result = inputStream.toDS()
756-
.groupByKey(x => x)
757-
.transformWithState(
758-
new MultiStatefulVariableTTLProcessor(ttlConfig),
759-
TimeMode.ProcessingTime(),
760-
OutputMode.Append())
761-
val clock = new StreamManualClock
753+
withTempDir { checkpointLocation =>
754+
val ttlConfig = TTLConfig(ttlDuration = Duration.ofMinutes(10))
755+
val inputStream = MemoryStream[String]
756+
val result = inputStream.toDS()
757+
.groupByKey(x => x)
758+
.transformWithState(
759+
new MultiStatefulVariableTTLProcessor(ttlConfig),
760+
TimeMode.ProcessingTime(),
761+
OutputMode.Append())
762+
val clock = new StreamManualClock
762763

763-
testStream(result)(
764-
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
764+
testStream(result)(
765+
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock,
766+
checkpointLocation = checkpointLocation.getAbsolutePath),
765767

766-
AddData(inputStream, "k1"),
767-
AdvanceManualClock(1 * 1000),
768-
CheckNewAnswer(("k1", 1)),
769-
assertNumStateRows(total = 3, updated = 3),
768+
AddData(inputStream, "k1"),
769+
AdvanceManualClock(1 * 1000),
770+
CheckNewAnswer(("k1", 1)),
771+
assertNumStateRows(total = 3, updated = 3),
770772

771-
StopStream,
772-
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
773+
StopStream,
774+
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock,
775+
checkpointLocation = checkpointLocation.getAbsolutePath),
773776

774-
AddData(inputStream, "k1"),
775-
AdvanceManualClock(1 * 1000),
776-
CheckNewAnswer(("k1", 2)),
777-
assertNumStateRows(total = 4, updated = 3),
777+
AddData(inputStream, "k1"),
778+
AdvanceManualClock(1 * 1000),
779+
CheckNewAnswer(("k1", 2)),
780+
assertNumStateRows(total = 4, updated = 3),
778781

779-
StopStream,
780-
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
782+
StopStream,
783+
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock,
784+
checkpointLocation = checkpointLocation.getAbsolutePath),
781785

782-
AddData(inputStream, "k1"),
783-
AdvanceManualClock(1 * 1000),
784-
CheckNewAnswer(("k1", 3)),
785-
assertNumStateRows(total = 5, updated = 3),
786+
AddData(inputStream, "k1"),
787+
AdvanceManualClock(1 * 1000),
788+
CheckNewAnswer(("k1", 3)),
789+
assertNumStateRows(total = 5, updated = 3),
786790

787-
StopStream
788-
)
791+
StopStream
792+
)
793+
}
789794
}
790795
}
791796
}

0 commit comments

Comments
 (0)