Skip to content

Commit f0a3a2e

Browse files
xiaonanyang-dbHyukjinKwon
authored andcommitted
[SPARK-53349][SQL] Optimized XML parser can't handle corrupted files correctly
### What changes were proposed in this pull request? In #51287, we introduced an optimized XML parser, which is more memory-efficient. However, the new parser reads the input stream eagerly on initialization. If the file is corrupted, the error is not caught properly and handled based on the `ignoreCorruptedFiles` option. This PR addresses the issue. ### Why are the changes needed? Bug fix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? Closes #52093 from xiaonanyang-db/SPARK-53349. Authored-by: Xiaonan Yang <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent b5840e1 commit f0a3a2e

File tree

5 files changed

+35
-30
lines changed

5 files changed

+35
-30
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXMLRecordReader.scala

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ case class StaxXMLRecordReader(inputStream: () => InputStream, options: XmlOptio
3737
extends XMLEventReader
3838
with Logging {
3939
// Reader for the XML record parsing.
40-
private val in1 = inputStream()
41-
private val primaryEventReader = StaxXmlParserUtils.filteredReader(in1, options)
40+
private lazy val in1 = inputStream()
41+
private lazy val primaryEventReader = StaxXmlParserUtils.filteredReader(in1, options)
4242

4343
private val xsdSchemaValidator = Option(options.rowValidationXSDPath)
4444
.map(path => ValidatorUtil.getSchema(path).newValidator())
4545
// Reader for the XSD validation, if an XSD schema is provided.
46-
private val in2 = xsdSchemaValidator.map(_ => inputStream())
46+
private lazy val in2 = xsdSchemaValidator.map(_ => inputStream())
4747
// An XMLStreamReader used by StAXSource for XSD validation.
48-
private val xsdValidationStreamReader =
48+
private lazy val xsdValidationStreamReader =
4949
in2.map(in => StaxXmlParserUtils.filteredStreamReader(in, options))
5050

5151
final var hasMoreRecord: Boolean = true
@@ -108,11 +108,18 @@ case class StaxXMLRecordReader(inputStream: () => InputStream, options: XmlOptio
108108
}
109109

110110
override def close(): Unit = {
111-
primaryEventReader.close()
112-
xsdValidationStreamReader.foreach(_.close())
113-
in1.close()
114-
in2.foreach(_.close())
115111
hasMoreRecord = false
112+
try {
113+
in1.close()
114+
in2.foreach(_.close())
115+
primaryEventReader.close()
116+
xsdValidationStreamReader.foreach(_.close())
117+
} catch {
118+
case NonFatal(e) =>
119+
// If the file is corrupted/missing, we won't be able to close the input streams. We do a
120+
// best-effort to close the streams and log the error if closing fails.
121+
logWarning("Error closing XML stream", e)
122+
}
116123
}
117124

118125
override def nextEvent(): XMLEvent = primaryEventReader.nextEvent()

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ class StaxXmlParser(
269269
logWarning("Skipped missing file", e)
270270
parser.close()
271271
None
272-
case _: IOException | _: RuntimeException | _: InternalError
272+
case _: IOException | _: RuntimeException | _: InternalError | _: AssertionError
273273
if options.ignoreCorruptFiles =>
274274
logWarning("Skipped the rest of the content in the corrupted file", e)
275275
parser.close()
@@ -289,6 +289,7 @@ class StaxXmlParser(
289289
StaxXmlParserUtils.currentElementAsString(parser, options.rowTag, options).trim
290290
)
291291
throw BadRecordException(() => record, () => Array.empty, e)
292+
case _ => throw e
292293
}
293294
}
294295
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,11 +273,15 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean)
273273
case _: AccessControlException | _: BlockMissingException =>
274274
parser.close()
275275
throw e
276-
case _: IOException | _: RuntimeException | _: InternalError
276+
case _: IOException | _: RuntimeException | _: InternalError | _: AssertionError
277277
if options.ignoreCorruptFiles =>
278278
logWarning("Skipped the rest of the content in the corrupted file", e)
279279
parser.close()
280280
Some(StructType(Nil))
281+
case _: IOException | _: RuntimeException | _: InternalError
282+
if !options.ignoreCorruptFiles =>
283+
parser.close()
284+
throw e
281285
case _ =>
282286
logWarning("Failed to infer schema from XML record", e)
283287
handleXmlErrorsByParseMode(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -235,26 +235,12 @@ object MultiLineXmlDataSource extends XmlDataSource {
235235

236236
val xmlParserRdd: RDD[StaxXMLRecordReader] =
237237
xml.flatMap { portableDataStream =>
238-
try {
239-
val inputStream = () =>
240-
CodecStreams.createInputStreamWithCloseResource(
241-
portableDataStream.getConfiguration,
242-
new Path(portableDataStream.getPath())
243-
)
244-
StaxXmlParser.convertStream(inputStream, parsedOptions)(identity)
245-
} catch {
246-
case e: FileNotFoundException if parsedOptions.ignoreMissingFiles =>
247-
logWarning("Skipped missing file", e)
248-
None
249-
case NonFatal(e) =>
250-
Utils.getRootCause(e) match {
251-
case _: RuntimeException | _: IOException | _: InternalError
252-
if parsedOptions.ignoreCorruptFiles =>
253-
logWarning("Skipped the rest of the content in the corrupted file", e)
254-
None
255-
case o => throw o
256-
}
257-
}
238+
val inputStream = () =>
239+
CodecStreams.createInputStreamWithCloseResource(
240+
portableDataStream.getConfiguration,
241+
new Path(portableDataStream.getPath())
242+
)
243+
StaxXmlParser.convertStream(inputStream, parsedOptions)(identity)
258244
}
259245

260246
SQLExecution.withSQLConfPropagated(sparkSession) {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3021,6 +3021,13 @@ class XmlSuite
30213021
.xml(inputFile.toURI.toString)
30223022
.collect()
30233023
assert(result.isEmpty)
3024+
3025+
val result2 = spark.read
3026+
.option("rowTag", "ROW")
3027+
.option("multiLine", true)
3028+
.xml(inputFile.toURI.toString)
3029+
.collect()
3030+
assert(result2.isEmpty)
30243031
}
30253032
})
30263033
withTempPath { dir =>

0 commit comments

Comments
 (0)