Skip to content

Commit 60b8723

Browse files
Hendrik HuebnerHendrikHuebner
authored andcommitted
Improve exception handling when adding artifacts
1 parent 77413d4 commit 60b8723

File tree

4 files changed

+180
-29
lines changed

4 files changed

+180
-29
lines changed

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import io.grpc.stub.StreamObserver
2929
import org.apache.spark.connect.proto
3030
import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse}
3131
import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
32+
import org.apache.spark.SparkRuntimeException
3233
import org.apache.spark.sql.artifact.ArtifactManager
3334
import org.apache.spark.sql.connect.utils.ErrorUtils
3435
import org.apache.spark.sql.util.ArtifactUtils
@@ -112,19 +113,34 @@ class SparkConnectAddArtifactsHandler(val responseObserver: StreamObserver[AddAr
112113
* @return
113114
*/
114115
protected def flushStagedArtifacts(): Seq[ArtifactSummary] = {
116+
val failedArtifactExceptions = mutable.ListBuffer[SparkRuntimeException]()
117+
115118
// Non-lazy transformation when using Buffer.
116-
stagedArtifacts.map { artifact =>
117-
// We do not store artifacts that fail the CRC. The failure is reported in the artifact
118-
// summary and it is up to the client to decide whether to retry sending the artifact.
119-
if (artifact.getCrcStatus.contains(true)) {
120-
if (artifact.path.startsWith(ArtifactManager.forwardToFSPrefix + File.separator)) {
121-
holder.artifactManager.uploadArtifactToFs(artifact.path, artifact.stagedPath)
122-
} else {
123-
addStagedArtifactToArtifactManager(artifact)
119+
val summaries = stagedArtifacts.map { artifact =>
120+
try {
121+
// We do not store artifacts that fail the CRC. The failure is reported in the artifact
122+
// summary and it is up to the client to decide whether to retry sending the artifact.
123+
if (artifact.getCrcStatus.contains(true)) {
124+
if (artifact.path.startsWith(ArtifactManager.forwardToFSPrefix + File.separator)) {
125+
holder.artifactManager.uploadArtifactToFs(artifact.path, artifact.stagedPath)
126+
} else {
127+
addStagedArtifactToArtifactManager(artifact)
128+
}
124129
}
130+
} catch {
131+
case e: SparkRuntimeException if e.getCondition == "ARTIFACT_ALREADY_EXISTS" =>
132+
failedArtifactExceptions += e
125133
}
126134
artifact.summary()
127135
}.toSeq
136+
137+
if (failedArtifactExceptions.nonEmpty) {
138+
val exception = failedArtifactExceptions.head
139+
failedArtifactExceptions.drop(1).foreach(exception.addSuppressed(_))
140+
throw exception
141+
}
142+
143+
summaries
128144
}
129145

130146
protected def cleanUpStagedArtifacts(): Unit = Utils.deleteRecursively(stagingDir.toFile)

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import io.grpc.stub.StreamObserver
3434

3535
import org.apache.spark.connect.proto
3636
import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse}
37+
import org.apache.spark.SparkRuntimeException
3738
import org.apache.spark.sql.connect.ResourceHelper
3839
import org.apache.spark.sql.test.SharedSparkSession
3940
import org.apache.spark.util.ThreadUtils
@@ -51,17 +52,31 @@ class AddArtifactsHandlerSuite extends SharedSparkSession with ResourceHelper {
5152
override def onCompleted(): Unit = {}
5253
}
5354

54-
class TestAddArtifactsHandler(responseObserver: StreamObserver[AddArtifactsResponse])
55+
class TestAddArtifactsHandler(responseObserver: StreamObserver[AddArtifactsResponse],
56+
throwIfArtifactExists: Boolean = false)
5557
extends SparkConnectAddArtifactsHandler(responseObserver) {
5658

5759
// Stop the staged artifacts from being automatically deleted
5860
override protected def cleanUpStagedArtifacts(): Unit = {}
5961

6062
private val finalArtifacts = mutable.Buffer.empty[String]
63+
private val artifactChecksums: mutable.Map[String, Long] = mutable.Map.empty
6164

6265
// Record the artifacts that are sent out for final processing.
6366
override protected def addStagedArtifactToArtifactManager(artifact: StagedArtifact): Unit = {
67+
// Throw if artifact already exists and has different checksum
68+
// This mocks the behavior of ArtifactManager.addArtifact without comparing the entire file
69+
if (throwIfArtifactExists
70+
&& finalArtifacts.contains(artifact.name)
71+
&& artifact.getCrc != artifactChecksums(artifact.name)) {
72+
throw new SparkRuntimeException(
73+
"ARTIFACT_ALREADY_EXISTS",
74+
Map("normalizedRemoteRelativePath" -> artifact.name)
75+
)
76+
}
77+
6478
finalArtifacts.append(artifact.name)
79+
artifactChecksums += (artifact.name -> artifact.getCrc)
6580
}
6681

6782
def getFinalArtifacts: Seq[String] = finalArtifacts.toSeq
@@ -418,4 +433,44 @@ class AddArtifactsHandlerSuite extends SharedSparkSession with ResourceHelper {
418433
}
419434
}
420435

436+
test("All artifacts are added, even if some fail") {
437+
val promise = Promise[AddArtifactsResponse]()
438+
val handler = new TestAddArtifactsHandler(new DummyAddArtifactsStreamObserver(promise),
439+
throwIfArtifactExists = true)
440+
try {
441+
val name1 = "jars/dummy1.jar"
442+
val name2 = "jars/dummy2.jar"
443+
val name3 = "jars/dummy3.jar"
444+
445+
val artifactPath1 = inputFilePath.resolve("smallClassFile.class")
446+
val artifactPath2 = inputFilePath.resolve("smallJar.jar")
447+
448+
assume(artifactPath1.toFile.exists)
449+
addSingleChunkArtifact(handler, sessionKey, name1, artifactPath1)
450+
addSingleChunkArtifact(handler, sessionKey, name3, artifactPath1)
451+
452+
val e = intercept[StatusRuntimeException] {
453+
addSingleChunkArtifact(handler, sessionKey, name1, artifactPath2)
454+
addSingleChunkArtifact(handler, sessionKey, name2, artifactPath1)
455+
addSingleChunkArtifact(handler, sessionKey, name3, artifactPath2)
456+
handler.onCompleted()
457+
}
458+
459+
// Both artifacts should be added, despite exception
460+
assert(handler.getFinalArtifacts.contains(name1))
461+
assert(handler.getFinalArtifacts.contains(name2))
462+
assert(handler.getFinalArtifacts.contains(name3))
463+
464+
assert(e.getStatus.getCode == Code.INTERNAL)
465+
val statusProto = StatusProto.fromThrowable(e)
466+
assert(statusProto.getDetailsCount == 1)
467+
val details = statusProto.getDetails(0)
468+
val info = details.unpack(classOf[ErrorInfo])
469+
470+
assert(e.getMessage.contains("ARTIFACT_ALREADY_EXISTS"))
471+
assert(info.getMetadataMap().get("messageParameters").contains(name1))
472+
} finally {
473+
handler.forceCleanUp()
474+
}
475+
}
421476
}

sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.nio.file.{CopyOption, Files, Path, Paths, StandardCopyOption}
2525
import java.util.concurrent.CopyOnWriteArrayList
2626
import java.util.concurrent.atomic.AtomicBoolean
2727

28+
import scala.collection.mutable.ListBuffer
2829
import scala.jdk.CollectionConverters._
2930
import scala.reflect.ClassTag
3031

@@ -266,28 +267,41 @@ class ArtifactManager(session: SparkSession) extends AutoCloseable with Logging
266267
* they are from a permanent location.
267268
*/
268269
private[sql] def addLocalArtifacts(artifacts: Seq[Artifact]): Unit = {
270+
val failedArtifactExceptions = ListBuffer[RuntimeException]()
271+
269272
artifacts.foreach { artifact =>
270-
artifact.storage match {
271-
case d: Artifact.LocalFile =>
272-
addArtifact(
273-
artifact.path,
274-
d.path,
275-
fragment = None,
276-
deleteStagedFile = false)
277-
case d: Artifact.InMemory =>
278-
val tempDir = Utils.createTempDir().toPath
279-
val tempFile = tempDir.resolve(artifact.path.getFileName)
280-
val outStream = Files.newOutputStream(tempFile)
281-
Utils.tryWithSafeFinallyAndFailureCallbacks {
282-
d.stream.transferTo(outStream)
283-
addArtifact(artifact.path, tempFile, fragment = None)
284-
}(finallyBlock = {
285-
outStream.close()
286-
})
287-
case _ =>
288-
throw SparkException.internalError(s"Unsupported artifact storage: ${artifact.storage}")
273+
try {
274+
artifact.storage match {
275+
case d: Artifact.LocalFile =>
276+
addArtifact(
277+
artifact.path,
278+
d.path,
279+
fragment = None,
280+
deleteStagedFile = false)
281+
case d: Artifact.InMemory =>
282+
val tempDir = Utils.createTempDir().toPath
283+
val tempFile = tempDir.resolve(artifact.path.getFileName)
284+
val outStream = Files.newOutputStream(tempFile)
285+
Utils.tryWithSafeFinallyAndFailureCallbacks {
286+
d.stream.transferTo(outStream)
287+
addArtifact(artifact.path, tempFile, fragment = None)
288+
}(finallyBlock = {
289+
outStream.close()
290+
})
291+
case _ =>
292+
throw SparkException.internalError(s"Unsupported artifact storage: ${artifact.storage}")
293+
}
294+
} catch {
295+
case e: SparkRuntimeException =>
296+
failedArtifactExceptions += e
289297
}
290298
}
299+
300+
if (failedArtifactExceptions.nonEmpty) {
301+
val exception = failedArtifactExceptions.head
302+
failedArtifactExceptions.drop(1).foreach(exception.addSuppressed(_))
303+
throw exception
304+
}
291305
}
292306

293307
def classloader: ClassLoader = synchronized {

sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import java.io.File
2020
import java.nio.charset.StandardCharsets
2121
import java.nio.file.{Files, Path, Paths}
2222

23-
import org.apache.spark.{SparkConf, SparkException}
23+
import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException}
2424
import org.apache.spark.metrics.source.CodegenMetrics
25+
import org.apache.spark.sql.Artifact
2526
import org.apache.spark.sql.classic.SparkSession
2627
import org.apache.spark.sql.functions.col
2728
import org.apache.spark.sql.internal.SQLConf
@@ -346,6 +347,71 @@ class ArtifactManagerSuite extends SharedSparkSession {
346347
}
347348
}
348349

350+
test("Add multiple artifacts to local session and check if all are added despite exception") {
351+
val copyDir = Utils.createTempDir().toPath
352+
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
353+
354+
val artifact1Path = "my/custom/pkg/artifact1.jar"
355+
val artifact2Path = "my/custom/pkg/artifact2.jar"
356+
val targetPath = Paths.get(artifact1Path)
357+
val targetPath2 = Paths.get(artifact2Path)
358+
359+
val classPath1 = copyDir.resolve("Hello.class")
360+
val classPath2 = copyDir.resolve("smallJar.jar")
361+
assume(artifactPath.resolve("Hello.class").toFile.exists)
362+
assume(artifactPath.resolve("smallClassFile.class").toFile.exists)
363+
364+
val artifact1 = Artifact.newArtifactFromExtension(
365+
targetPath.getFileName.toString,
366+
targetPath,
367+
new Artifact.LocalFile(Paths.get(classPath1.toString)))
368+
369+
val alreadyExistingArtifact = Artifact.newArtifactFromExtension(
370+
targetPath2.getFileName.toString,
371+
targetPath,
372+
new Artifact.LocalFile(Paths.get(classPath2.toString)))
373+
374+
val artifact2 = Artifact.newArtifactFromExtension(
375+
targetPath2.getFileName.toString,
376+
targetPath2,
377+
new Artifact.LocalFile(Paths.get(classPath2.toString)))
378+
379+
spark.artifactManager.addLocalArtifacts(Seq(artifact1))
380+
381+
val exception = intercept[SparkRuntimeException] {
382+
spark.artifactManager.addLocalArtifacts(
383+
Seq(alreadyExistingArtifact, artifact2, alreadyExistingArtifact))
384+
}
385+
386+
// Validate exception: Should be ARTIFACT_ALREADY_EXISTS and have one suppressed exception
387+
assert(exception.getCondition == "ARTIFACT_ALREADY_EXISTS",
388+
s"Expected ARTIFACT_ALREADY_EXISTS but got: ${exception.getCondition}")
389+
390+
assert(exception.getSuppressed.length == 1)
391+
assert(exception.getSuppressed.head.isInstanceOf[SparkRuntimeException])
392+
val suppressed = exception.getSuppressed.head.asInstanceOf[SparkRuntimeException]
393+
assert(suppressed.getCondition == "ARTIFACT_ALREADY_EXISTS")
394+
395+
// Artifact1 should have been added
396+
val expectedFile1 = ArtifactManager.artifactRootDirectory
397+
.resolve(s"$sessionUUID/jars/$artifact1Path")
398+
.toFile
399+
assert(expectedFile1.exists())
400+
401+
// Artifact2 should have been added despite exception
402+
val expectedFile2 = ArtifactManager.artifactRootDirectory
403+
.resolve(s"$sessionUUID/jars/$artifact2Path")
404+
.toFile
405+
assert(expectedFile2.exists())
406+
407+
// Cleanup
408+
artifactManager.cleanUpResourcesForTesting()
409+
val sessionDir = ArtifactManager.artifactRootDirectory.resolve(sessionUUID).toFile
410+
411+
assert(!expectedFile1.exists())
412+
assert(!sessionDir.exists())
413+
}
414+
349415
test("Added artifact can be loaded by the current SparkSession") {
350416
val path = artifactPath.resolve("IntSumUdf.class")
351417
assume(path.toFile.exists)

0 commit comments

Comments
 (0)