Skip to content

Conversation

@JoonPark1
Copy link

…cordingly once engine submit timeout is reached - prevent subsequent kyuubi OOM

Why are the changes needed?

This PR change is to address bug #7226. It updates the behavior of updating metadata store accordingly for batch jobs that have timed out due to waiting for available spark driver engine. This will fix the subsequent restarted kyuubi server from repeatedly polling for the spark application status of each and every batch job, which can cause consecutive OOM errors under k8 cluster deployment mode for kyuubi.

How was this patch tested?

This patch was tested through integration test that was added to test suite class called "SparkOnKubernetesTestsSuite.scala".

Was this patch authored or co-authored using generative AI tooling?

No!

…cordingly once engine submit timeout is reached - prevent subsequent kyuubi OOM
@codecov-commenter
Copy link

codecov-commenter commented Oct 21, 2025

Codecov Report

❌ Patch coverage is 0% with 14 lines in your changes missing coverage. Please review.
✅ Project coverage is 0.00%. Comparing base (3b205a3) to head (b952a15).
⚠️ Report is 3 commits behind head on master.

Files with missing lines Patch % Lines
...kyuubi/engine/KubernetesApplicationOperation.scala 0.00% 14 Missing ⚠️
Additional details and impacted files
@@          Coverage Diff           @@
##           master   #7227   +/-   ##
======================================
  Coverage    0.00%   0.00%           
======================================
  Files         696     696           
  Lines       43530   43543   +13     
  Branches     5883    5884    +1     
======================================
- Misses      43530   43543   +13     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@pan3793 pan3793 requested a review from turboFei October 24, 2025 08:56
@turboFei turboFei requested a review from Copilot October 24, 2025 19:56
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR addresses issue #7226 by preventing Kyuubi OOM errors when multiple batch jobs time out waiting for Spark driver engines. When a batch job reaches the engine submit timeout, the metadata store is now properly updated with TIMEOUT state and NOT_FOUND engine state, preventing the restarted Kyuubi server from repeatedly polling these timed-out jobs.

Key Changes:

  • Updated timeout handling to persist batch job state when engine submission times out
  • Added metadata store update with proper error state and message on timeout
  • Added integration test to verify timeout behavior updates metadata correctly

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
KubernetesApplicationOperation.scala Added metadata store update logic when driver pod is not found after submit timeout
SparkOnKubernetesTestsSuite.scala Added integration test verifying timeout state is properly persisted to metadata store

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

assert(!failKillResponse._1)
}
test(
"If spark batch reach timeout, it should have associated Kyuubi Application Operation be " +
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammatical error in test description. Should be 'reaches timeout' instead of 'reach timeout', and 'should have the associated' instead of 'should have associated'.

Suggested change
"If spark batch reach timeout, it should have associated Kyuubi Application Operation be " +
"If spark batch reaches timeout, it should have the associated Kyuubi Application Operation be " +

Copilot uses AI. Check for mistakes.
@turboFei
Copy link
Member

turboFei commented Oct 24, 2025

Hi @JoonPark1
Thanks for the contribution.

For this issue, does it has chance to update metadata in BatchJobSubmission?

private def updateBatchMetadata(): Unit = {
val endTime = if (isTerminalState(state)) lastAccessTime else 0L
if (isTerminalState(state) && _applicationInfo.isEmpty) {
_applicationInfo = Some(ApplicationInfo.NOT_FOUND)
}
_applicationInfo.foreach { appInfo =>
val metadataToUpdate = Metadata(
identifier = batchId,
state = state.toString,
engineOpenTime = appStartTime,
engineId = appInfo.id,
engineName = appInfo.name,
engineUrl = appInfo.url.orNull,
engineState = getAppState(state, appInfo.state).toString,
engineError = appInfo.error,
endTime = endTime)
session.sessionManager.updateMetadata(metadataToUpdate)
}
}

@JoonPark1
Copy link
Author

Hey @turboFei. I believe the spark driver engine state and spark app state will be updated for metadata store...

@turboFei
Copy link
Member

turboFei commented Oct 24, 2025

Hi @JoonPark1
Before this PR, it can not update the metadata with BatchJobSubmission:: updateBatchMetadata?

Could you provide more details?

@JoonPark1
Copy link
Author

@turboFei Sure! Once the kyuubi batch job times out because the elapsed time exceeds the configured submitTimeout property value (no spark driver is instantiated and in running state to handle the submitted batch job), the metadata about the spark application and the spark driver engine state is updated accordingly via "org.apache.kyuubi.server.metadata.MetadataManager" class' updateMetadata method which takes in the new up-to-date Metadata construct object instance (which is instance of class "org.apache.kyuubi.server.metadata.api.Metadata"). Then, internally within the manager class, the method calls upon the "org.apache.kyuubi.server.metadata.MetadataStore" class' updateMetadata method, which keeps the data regarding the state of each submitted kyuubi batch jobs utilizing spark compute engine as in-sync with the state of kyuubi's metadata store in relationalDB. As you can see, the whole flow does not need to invoke the BatchJobSubmission:: updateBatchMetadata to update the kyuubi's metadata store instance.

@turboFei
Copy link
Member

Hi, I checked the call site.

getApplicationInfoByTag <- KyuubiApplicationManager::getApplicationInfo
image

KyuubiApplicationManager::getApplicationInfo <- BatchJobSumission::currentApplicationInfo

image

And finally, BatchJobSumission::updateBatchMetadata will update the metadata.

@JoonPark1
Copy link
Author

@turboFei. Hey Fei! You are right about the flow of the call-site... basically, the polling for status of submitted batch job originates from BatchJobSubmission:currentApplicationInfo... However, I don't think my change at the level of KubernetesApplicationOperation class calls the method BatchJobSumission::updateBatchMetadata. Do you suggest I refactor my change so that the update to kyuubi metadata happens through the BatchJobSubmission internal method? The alternative call-site flow will be BatchJobSumission::updateBatchMetadata() -> KyuubiSessionManager::updateMetadata(). I think this way also works as you proposed because it also works with relevant fields for Metadata instance associated with the specific batch job submission on kyuubi side... Let me know what you think.

@turboFei
Copy link
Member

turboFei commented Oct 26, 2025

Hi @JoonPark1
I checked the description in #7226

However, when there is heavy load, it can cause kyuubi to store records via MetadataManager about the state of each batch job as "PENDING" and repeated polling about each batch job's status until it runs out of memory.

I just wonder why kyuubi server dit not update the metadata as the flow of the call-site I mentioned above.

I mean before this PR.

@turboFei
Copy link
Member

turboFei commented Oct 26, 2025

And for KubernetesApplicationOperation, it used SharedIndexInformer to get the kubernetes application states.

val enginePodInformer = client.pods()
.withLabel(LABEL_KYUUBI_UNIQUE_KEY)
.inform(new SparkEnginePodEventHandler(kubernetesInfo))
info(s"[$kubernetesInfo] Start Kubernetes Client POD Informer.")
enginePodInformers.put(kubernetesInfo, enginePodInformer)
if (sparkAppUrlSource == KubernetesApplicationUrlSource.SVC) {
info(s"[$kubernetesInfo] Start Kubernetes Client SVC Informer.")
val engineSvcInformer = client.services()
.inform(new SparkEngineSvcEventHandler(kubernetesInfo))
engineSvcInformers.put(kubernetesInfo, engineSvcInformer)
}
client

Do you mean?

  1. it did not update the metadata inside BatchJobSubmission, and the batch state was PENDING.
  2. after kyuubi server restarted, due the batch state was PENDING, it tried to resubmit the batch
  3. and failed to get the batch app state after submit timeout, application state was NOT_FOUND
  4. then it still did not update the metadata inside BatchJobSubmission

@JoonPark1
Copy link
Author

JoonPark1 commented Oct 27, 2025

@turboFei Hey Fei. You're train of thought is right on besides point #3. Basically, once kyuubi server restarts, it tries to resubmit the batch and poll for the batch status but once submitTimeout is reached, the metadata associated for that batch is not updated and remains as "PENDING" with engineState being "UNKNOWN". This does not stop the resubmission and repolling for the batch on subsequent restarts of kyuubi server. That's why my change aimed to mark it as one of final/terminal states for the batch once submit-timeout is reached: "TIMEOUT" for batch app status so that repeated polling is not happening because the kyuubi batch manager will know the state of batch is final and timed out waiting for the compute driver engine.

Reformat comment for better readability and fix scalastyle char-limit violations.
@turboFei
Copy link
Member

but once submitTimeout is reached, the metadata associated for that batch is not updated and remains as "PENDING" with engineState being "UNKNOWN"

Is it possible to enhance the BatchJobSumission::updateBatchMetadata to cover this case?

@JoonPark1
Copy link
Author

JoonPark1 commented Oct 27, 2025

Hey @turboFei. I'm reconsidering moving my metadata-updating logic directly into the BatchJobSubmission:updateBatchMetadata method like you suggested... Does this seem like a good logic to set the appState appropriate as TIMED_OUT (which is appropriate given the situation). This block will be placed just before the Metadata object instantiation that will be passed to KyuubiSessionManager::updateMetadata() call.

val curTime = System.currentTimeMillis();
    val submitTime = submitTime(); 
    val diff = curTime - submitTime 
    if(diff > KyuubiConf.ENGINE_SUBMIT_TIMEOUT && state.toString.equals("PENDING")){
      _applicationInfo = Some(ApplicationInfo.NOT_FOUND)
      state = OperationState.TIMEOUT
    }

Let me know if you think this refactoring done directly within the updateBatchMetadata method is more appropriate. This should update batch metadata store the same way as having the effect done at KyuubiApplicationOperation level.

@JoonPark1
Copy link
Author

This issue is specific to kyuubi v1.9.2 @turboFei

@turboFei
Copy link
Member

This issue is specific to kyuubi v1.9.2 @turboFei

could you try to use the latest kyuubi?

@turboFei
Copy link
Member

I am using the latest kyuubi based on master branch, and do not meet this issue.

@JoonPark1
Copy link
Author

Hey @turboFei... I know you said the current latest version of kyuubi does not face this issue. Have you tried replicating it and checking the state of metadata store to see if the polling for the batch job that timed out is updated appropriately?

@turboFei
Copy link
Member

Hey @turboFei... I know you said the current latest version of kyuubi does not face this issue. Have you tried replicating it and checking the state of metadata store to see if the polling for the batch job that timed out is updated appropriately?

I remember I fixed many batch issues, not sure all of them backport to 1.9.2.

Could you try to use the latest version, I think they are compatible.

@turboFei
Copy link
Member

BTW, the affected version mentioned in #7226 is 1.10.2, could you correct it to prevent misleading?

@JoonPark1
Copy link
Author

My bad @turboFei. The kyuubi version we are using that's being affected by the issue is v1.10.2!

@turboFei
Copy link
Member

mysql> select count(*) from metadata where engine_state = 'NOT_FOUND';
+----------+
| count(*) |
+----------+
|        0 |
+----------+
1 row in set (2.44 sec)

mysql> select count(*) from metadata where engine_state != 'NOT_FOUND';
+----------+
| count(*) |
+----------+
|   613153 |
+----------+
1 row in set (2.29 sec)

mysql>

@JoonPark1
Copy link
Author

JoonPark1 commented Oct 30, 2025

@turboFei Hey Fei! I saw your state of metadata store... For the issue, when a kyuubi batch job times out, the engine_state column is set to "UNKNOWN", but my fix attempts to correct this to more appropriate state ("NOT_FOUND") as it makes sense to indicate the spark driver responsible for handling the batch job is not up. Let me know what you think whether my fix is appropriate given the circumstance. Basically, if the issue is already resolved in latest version, the number of records corresponding to batch jobs with engine_state UNKNOWN should be 0 ( could you verify it is indeed that)?

@turboFei
Copy link
Member

turboFei commented Nov 3, 2025

Some comments:

  1. I think you can try to use the latest master branch code
  2. If it is a bug need to fix, please update the metadata inside BatchJobSubmission

@turboFei
Copy link
Member

turboFei commented Nov 6, 2025

I think we can limit the number to recover in one batch,

private[kyuubi] def recoverBatchSessions(): Unit = withBatchRecoveryLockRequired {

Do not recover all batches together.

For example, there are 400 batches to recover on restarting.

We can add a config, to recover 50 batches one time, and wait the batches engineId ready(app submitted) or wait a maximum interval before next recovery. how do you think about?

@JoonPark1

@JoonPark1
Copy link
Author

JoonPark1 commented Nov 10, 2025

@turboFei That does sound like a good alternative to impose a limit on max # of batches to recover per each recovery attempt. Is this already an available configuration for kyuubi or does it needs to be added?

If it's not available, I think we can add as extra kyuubi server-side config and have it be part of updated KyuubiSessionManager::getBatchSessionsToRecover method as extra argument as a batchSize parameter to the method and deduced pagination offset to read from the MetadataStore instance. Additionally, in the KyuubiRestFrontendService::recoverBatchSessions(), we can have it call KyuubiSessionManager::getBatchSessionsToRecover repeatedly with pagination offset tracking to obtain sequence of instantiated KyuubiBatchSessions that correspond to batch records obtained from relational store until there are no more metadata records pertaining to batches to process.

@turboFei
Copy link
Member

turboFei commented Nov 10, 2025

Is this already an available configuration for kyuubi or does it needs to be added?

Not yet now. would you like to contribute it?

@JoonPark1
Copy link
Author

@turboFei Sure. I would love to contribute it...

… on metadata store upon kyuubi server recovery - to prevent it from being overwhelmed and face OOM issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants