Skip to content

[SPARK-53294][SS] Enable StateDataSource with state checkpoint v2 (only batchId option) #52047

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

dylanwong250
Copy link
Contributor

@dylanwong250 dylanwong250 commented Aug 15, 2025

What changes were proposed in this pull request?

This PR enables StateDataSource (https://spark.apache.org/docs/latest/streaming/structured-streaming-state-data-source.html) to work with state checkpoint v2 format ("spark.sql.streaming.stateStore.checkpointFormatVersion") when using the batchId option. This is done by retrieving the stateUniqueIds from the CommitLog and using the correct partition of these Ids to read from the state store.

A check is added to throw an error when users try to use the readChangeFeed or changeStartBatchId options when the CommitLog metadata contains stateUniqueIds.

Note that to read checkpoint v2 state data sources it is required to have "spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.

Why are the changes needed?

State checkpoint v2 ("spark.sql.streaming.stateStore.checkpointFormatVersion") introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format.

Does this PR introduce any user-facing change?

Yes.

STDS_INVALID_OPTION_VALUE will be thrown when readChangeFeed or changeStartBatchId options are used when the CommitLog contains stateUniqueIds. Previously an error related to the store not existing would be thrown.

State Data Source will work when checkpoint v2 is used and batchId is used.

How was this patch tested?

Adds a new test suite RocksDBWithCheckpointV2StateDataSourceReaderSuite that reuses the unit tests in StateDataSourceReadSuite and adds tests for the new error cases.

testOnly *RocksDBWithCheckpointV2StateDataSourceReaderSuite
[info] Total number of tests run: 16
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 16, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed

Was this patch authored or co-authored using generative AI tooling?

No

@@ -617,6 +617,64 @@ StateDataSourceReadSuite {
}
}

class RocksDBWithCheckpointV2StateDataSourceReaderSuite extends StateDataSourceReadSuite {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we add the operator specific tests ?

  • aggregations
  • dedup
  • join
  • transformWithState etc ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extending StateDataSourceReadSuite will run all the tests in StateDataSourceReadSuite with the config set in beforeAll. This pattern already exists in this suite (example). I also attached the result of running this command to the PR description:

testOnly *RocksDBWithCheckpointV2StateDataSourceReaderSuite

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future when changeStartBatchId is added we will have to generate the golden files for those tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you extend RocksDBStateDataSourceReadSuite here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extending RocksDBStateDataSourceReadSuite would only add one test that tests for invalid options. It also sets:

spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled",
      "false")

I would prefer extending StateDataSourceReadSuite for now with plans to eventually extend RocksDBWithChangelogCheckpointStateDataSourceReaderSuite to include the tests with changeStartBatchId.

I added:

spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled",
      "true")

To the test suite I added.

"optionName" -> StateSourceOptions.READ_CHANGE_FEED,
"message" -> "Read change feed is currently not supported with checkpoint v2."))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test that creates the checkpoint with checkpointv2 and tries to read from it with the config set to checkpoint v1 and vice-versa?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this will error out when reading the commit log.

Should we do something like only checking this assertion if we are calling from the context of a streaming query? Or maybe just check for this assertion error?

cc @anishshri-db

Copy link
Contributor

@liviazhu liviazhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops didn't mean to approve

}

if (commitMetadata.stateUniqueIds.isDefined) {
Some(commitMetadata.stateUniqueIds.get(operatorId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This can be written in a more scala way, without if-else. Maybe with stateUniqueIds.map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to commitMetadata.stateUniqueIds.flatMap(_.get(operatorId))

/**
* Constants for store names used in Stream-Stream joins.
*/
object StatePartitionReaderStoreNames {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why define these names here? These are join specific and shouldn't live here. I think they should already be defined in the join code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed these and refactored a bit. I added more detail in the other comment in this file.

partition.sourceOptions.operatorStateUniqueIds,
useColumnFamiliesForJoins = false)

partition.sourceOptions.storeName match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to do this here. This can be done within the join call above and it will just return the id you need for the storeName, instead of returning the entire stateStoreCheckpointIds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a new method getStateStoreCheckpointId in SymmetricHashJoinStateManager which maps (storeName -> correct checkpoint id) done in one function call. Let me know if this makes more sense.

@@ -690,7 +690,7 @@ private[sql] class RocksDBStateStoreProvider

rocksDB.load(
version,
stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) uniqueId else None,
stateStoreCkptId = uniqueId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the conf check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the behavior was a bit confusing where uniqueId could be Some("") but would not be used to get the underlying store.

This also would need to be removed in the future if we wanted to enable reading checkpoint v2 stores when enableStateStoreCheckpointIds = false.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants