Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1022,11 +1022,19 @@ private[spark] class SparkSubmit extends Logging {
e
}

var exitCode: Int = 1
try {
app.start(childArgs.toArray, sparkConf)
exitCode = 0
} catch {
case t: Throwable =>
throw findCause(t)
val cause = findCause(t)
cause match {
case e: SparkUserAppException =>
exitCode = e.exitCode
case _ =>
}
throw cause
} finally {
if (args.master.startsWith("k8s") && !isShell(args.primaryResource) &&
!isSqlShell(args.mainClass) && !isThriftServer(args.mainClass) &&
Expand All @@ -1037,6 +1045,12 @@ private[spark] class SparkSubmit extends Logging {
case e: Throwable => logError("Failed to close SparkContext", e)
}
}
if (sparkConf.get(SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT)) {
logInfo(
log"Calling System.exit() with exit code ${MDC(LogKeys.EXIT_CODE, exitCode)} " +
log"because ${MDC(LogKeys.CONFIG, SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT.key)}=true")
exitFn(exitCode)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In one of our internal implementations, we initiated a timer that permitted certain slow non-demon threads to complete their tasks, such as updating external metrics and reporting states. Upon the timer's expiration, the system logs a message, records thread dumps of all active threads (this can be used for triaging the cause of the condition), and subsequently exits. Can you extend this patch to do these?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this:

`.....
if (forceTerminateJVM) {
createAndStartShutdownThread(timeToWaitInSeconds, exitCode)
}
}

def createAndStartShutdownThread(timeToWaitInSeconds: Long, exitCode: Int): Unit = {
logInfo("Starting the shutdown thread")
val shutdownThread = new Thread(new Runnable {
override def run(): Unit = {
logInfo(s"Shutdown thread will wait for ${timeToWaitInSeconds}s before " +
s"exiting the JVM")
Thread.sleep(TimeUnit.SECONDS.toMillis(timeToWaitInSeconds))
logWarning("There are non-daemon threads preventing this JVM from shutting down")
Thread.getAllStackTraces.keySet().asScala.filter(t => !t.isDaemon && t.isAlive).
foreach(t => {
logWarning(s"== ${t.toString} ==")
logWarning(t.getStackTrace().mkString(""))
})
logWarning(s"Stopping the JVM with System.exit(${exitCode})")
System.exit(exitCode)
}
}, "JVMShutdownThread")
shutdownThread.setDaemon(true)
shutdownThread.start()
}
`

Copy link
Member Author

@pan3793 pan3793 Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itskals I think it's more appropriate to implement this in a shutdown hook. You actually want a graceful shutdown mechanism that's not coupled to either daemon or non-daemon threads. If users want to do some cleanup work or graceful shutdown logic before the JVM terminates, they need to register a shutdown hook rather than creating non-daemon threads.

BTW, you can use three backticks to quote the code block

this is
  a code block

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the usage of non-demon threads is not desirable, it's hard to enforce in actual application development.

I think it's more appropriate to implement this in a shutdown hook.
Also possible there.

}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2311,6 +2311,15 @@ package object config {
.toSequence
.createWithDefault(Nil)

private[spark] val SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT =
ConfigBuilder("spark.submit.callSystemExitOnMainExit")
.doc("If true, SparkSubmit will call System.exit() to initiate JVM shutdown once the " +
"user's main method has exited. This can be useful in cases where non-daemon JVM " +
"threads might otherwise prevent the JVM from shutting down on its own.")
.version("4.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val SCHEDULER_ALLOCATION_FILE =
ConfigBuilder("spark.scheduler.allocation.file")
.version("0.8.1")
Expand Down
66 changes: 66 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1593,6 +1593,44 @@ class SparkSubmitSuite
runSparkSubmit(argsSuccess, expectFailure = false))
}

test("spark.submit.callSystemExitOnMainExit returns non-zero exit code on unclean main exit") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", MainThrowsUncaughtExceptionSparkApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--conf", s"${SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT.key}=true",
unusedJar.toString
)
assertResult(1)(runSparkSubmit(args, expectFailure = true))
}

test("spark.submit.callSystemExitOnMainExit calls system exit on clean main exit") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", NonDaemonThreadSparkApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--conf", s"${SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT.key}=true",
unusedJar.toString
)
// With SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT set to false, the non-daemon thread will
// prevent the JVM from beginning shutdown and the following call will fail with a
// timeout:
assertResult(0)(runSparkSubmit(args))
}

test("spark.submit.callSystemExitOnMainExit with main that explicitly calls System.exit") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class",
MainExplicitlyCallsSystemExit3SparkApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--conf", s"${SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT.key}=true",
unusedJar.toString
)
// This main class explicitly exits with System.exit(3), hence this expected exit code:
assertResult(3)(runSparkSubmit(args, expectFailure = true))
}

private def testRemoteResources(
enableHttpFs: Boolean,
forceDownloadSchemes: Seq[String] = Nil): Unit = {
Expand Down Expand Up @@ -1876,6 +1914,34 @@ object SimpleApplicationTest {
}
}

object MainThrowsUncaughtExceptionSparkApplicationTest {
def main(args: Array[String]): Unit = {
throw new Exception("User exception")
}
}

object NonDaemonThreadSparkApplicationTest {
def main(args: Array[String]): Unit = {
val nonDaemonThread: Thread = new Thread {
override def run(): Unit = {
while (true) {
Thread.sleep(1000)
}
}
}
nonDaemonThread.setDaemon(false)
nonDaemonThread.setName("Non-Daemon-Thread")
nonDaemonThread.start()
// Fall off the end of the main method.
}
}

object MainExplicitlyCallsSystemExit3SparkApplicationTest {
def main(args: Array[String]): Unit = {
System.exit(3)
}
}

object UserClasspathFirstTest {
def main(args: Array[String]): Unit = {
val ccl = Thread.currentThread().getContextClassLoader()
Expand Down