-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-48547][DEPLOY] Add opt-in flag to have SparkSubmit automatically call System.exit after user code main method exits #52091
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
cc @JoshRosen who is the original author of this change. There were some discussions in the original PR #46889, but actually no objections, this functionality is useful for Spark on K8s cases, to allow Spark to exit even with non-daemon threads (align to Spark on YARN behavior). SPARK-34674 was proposed to address the Spark on K8s exit issue, but it calls For other reviewers, in addition to the UT added in the change, I created a demo project to help you verify this functionality manually. |
logInfo( | ||
log"Calling System.exit() with exit code ${MDC(LogKeys.EXIT_CODE, exitCode)} " + | ||
log"because main ${MDC(LogKeys.CONFIG, SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT.key)}=true") | ||
exitFn(exitCode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In one of our internal implementations, we initiated a timer that permitted certain slow non-demon threads to complete their tasks, such as updating external metrics and reporting states. Upon the timer's expiration, the system logs a message, records thread dumps of all active threads (this can be used for triaging the cause of the condition), and subsequently exits. Can you extend this patch to do these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this:
`.....
if (forceTerminateJVM) {
createAndStartShutdownThread(timeToWaitInSeconds, exitCode)
}
}
def createAndStartShutdownThread(timeToWaitInSeconds: Long, exitCode: Int): Unit = {
logInfo("Starting the shutdown thread")
val shutdownThread = new Thread(new Runnable {
override def run(): Unit = {
logInfo(s"Shutdown thread will wait for ${timeToWaitInSeconds}s before " +
s"exiting the JVM")
Thread.sleep(TimeUnit.SECONDS.toMillis(timeToWaitInSeconds))
logWarning("There are non-daemon threads preventing this JVM from shutting down")
Thread.getAllStackTraces.keySet().asScala.filter(t => !t.isDaemon && t.isAlive).
foreach(t => {
logWarning(s"== ${t.toString} ==")
logWarning(t.getStackTrace().mkString(""))
})
logWarning(s"Stopping the JVM with System.exit(${exitCode})")
System.exit(exitCode)
}
}, "JVMShutdownThread")
shutdownThread.setDaemon(true)
shutdownThread.start()
}
`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@itskals I think it's more appropriate to implement this in a shutdown hook. You actually want a graceful shutdown mechanism that's not coupled to either daemon or non-daemon threads. If users want to do some cleanup work or graceful shutdown logic before the JVM terminates, they need to register a shutdown hook rather than creating non-daemon threads.
BTW, you can use three backticks to quote the code block
this is
a code block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While the usage of non-demon threads is not desirable, it's hard to enforce in actual application development.
I think it's more appropriate to implement this in a shutdown hook.
Also possible there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM + cc @zhouyejoe @mridulm since we added something really similar internally
if (sparkConf.get(SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT)) { | ||
logInfo( | ||
log"Calling System.exit() with exit code ${MDC(LogKeys.EXIT_CODE, exitCode)} " + | ||
log"because main ${MDC(LogKeys.CONFIG, SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT.key)}=true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
main
in this log line feels out of place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for checking, updated.
What's the behavior for cluster mode? |
@yaooqinn In YARN cluster mode, the AM always sends a kill signal to terminate the Driver JVM after main exited; there is no behavior change from the user's perspective, regardless of whether In K8s cluster mode, after the main method exits,
In summary, from the user's perspective, this functionality helps K8s cluster mode for jobs that have daemon threads in driver - they can enable |
Since this is not a general config, can we make it k8s specific https://spark.apache.org/docs/latest/running-on-kubernetes.html#configuration? |
@yaooqinn Actually, compared to other Resource Managers, YARN is special, it does additional cleanup work when the AM main method exits. I tested Standalone cluster mode, it's similar to K8s, non-daemon threads also block driver JVM exits. I don't have env to test Mesos, but have a quick look at the codebase, I think it's similar to K8s. ![]() For cases where the driver runs outside of the Resource Manager, e.g. local or client mode, non-daemon threads also block driver JVM exits. In summary, I think this feature is a general solution to tackle the non-daemon threads issues. |
This PR is based on #46889 authored by @JoshRosen
What changes were proposed in this pull request?
This PR adds a new SparkConf flag option,
spark.submit.callSystemExitOnMainExit
(default false), which when true will cause SparkSubmit to callSystem.exit()
in the JVM once the user code's main method has exited (for Java / Scala jobs) or once the user's Python or R script has exited.Why are the changes needed?
This is intended to address a longstanding issue where
spark-submit
runs might hang after user code has completed:According to Java’s java.lang.Runtime docs:
For Python and R programs, SparkSubmit’s PythonRunner and RRunner will call System.exit() if the user program exits with a non-zero exit code (see python and R runner code).
But for Java and Scala programs, plus any successful R or Python programs, Spark will not automatically call System.exit.
In those situation, the JVM will only shutdown when, via event (1), all non-daemon threads have exited (unless the job is cancelled and sent an external interrupt / kill signal, corresponding to event (3)).
Thus, non-daemon threads might cause logically-completed spark-submit jobs to hang rather than completing.
The non-daemon threads are not always under Spark's own control and may not necessarily be cleaned up by
SparkContext.stop()
.Thus, it is useful to have an opt-in functionality to have SparkSubmit automatically call
System.exit()
upon main method exit (which usually, but not always, corresponds to job completion): this option will allow users and data platform operators to enforce System.exit() calls without having to modify individual jobs' code.Does this PR introduce any user-facing change?
Yes, it adds a new user-facing configuration option for opting in to a behavior change.
How was this patch tested?
New tests in
SparkSubmitSuite
, including one which hangs (failing with a timeout) unless the new option is set totrue
.Was this patch authored or co-authored using generative AI tooling?
No.