Skip to content

Commit 88ff2ab

Browse files
draft
1 parent 8530444 commit 88ff2ab

File tree

3 files changed

+20
-4
lines changed

3 files changed

+20
-4
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXMLRecordReader.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,14 @@ case class StaxXMLRecordReader(inputStream: () => InputStream, options: XmlOptio
3838
with Logging {
3939
// Reader for the XML record parsing.
4040
private val in1 = inputStream()
41-
private val primaryEventReader = StaxXmlParserUtils.filteredReader(in1, options)
41+
private lazy val primaryEventReader = StaxXmlParserUtils.filteredReader(in1, options)
4242

4343
private val xsdSchemaValidator = Option(options.rowValidationXSDPath)
4444
.map(path => ValidatorUtil.getSchema(path).newValidator())
4545
// Reader for the XSD validation, if an XSD schema is provided.
4646
private val in2 = xsdSchemaValidator.map(_ => inputStream())
4747
// An XMLStreamReader used by StAXSource for XSD validation.
48-
private val xsdValidationStreamReader =
48+
private lazy val xsdValidationStreamReader =
4949
in2.map(in => StaxXmlParserUtils.filteredStreamReader(in, options))
5050

5151
final var hasMoreRecord: Boolean = true
@@ -108,11 +108,16 @@ case class StaxXMLRecordReader(inputStream: () => InputStream, options: XmlOptio
108108
}
109109

110110
override def close(): Unit = {
111-
primaryEventReader.close()
112-
xsdValidationStreamReader.foreach(_.close())
113111
in1.close()
114112
in2.foreach(_.close())
115113
hasMoreRecord = false
114+
try {
115+
primaryEventReader.close()
116+
xsdValidationStreamReader.foreach(_.close())
117+
} catch {
118+
case NonFatal(e) =>
119+
logWarning("Error closing XML stream", e)
120+
}
116121
}
117122

118123
override def nextEvent(): XMLEvent = primaryEventReader.nextEvent()

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,10 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean)
278278
logWarning("Skipped the rest of the content in the corrupted file", e)
279279
parser.close()
280280
Some(StructType(Nil))
281+
case _: IOException | _: RuntimeException | _: InternalError
282+
if !options.ignoreCorruptFiles =>
283+
parser.close()
284+
throw e
281285
case _ =>
282286
logWarning("Failed to infer schema from XML record", e)
283287
handleXmlErrorsByParseMode(

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3021,6 +3021,13 @@ class XmlSuite
30213021
.xml(inputFile.toURI.toString)
30223022
.collect()
30233023
assert(result.isEmpty)
3024+
3025+
val result2 = spark.read
3026+
.option("rowTag", "ROW")
3027+
.option("multiLine", true)
3028+
.xml(inputFile.toURI.toString)
3029+
.collect()
3030+
assert(result2.isEmpty)
30243031
}
30253032
})
30263033
withTempPath { dir =>

0 commit comments

Comments
 (0)