Skip to content

Commit d5665fb

Browse files
u
1 parent f03ba21 commit d5665fb

File tree

2 files changed

+15
-27
lines changed

2 files changed

+15
-27
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,7 +1007,7 @@ class XmlTokenizer(
10071007
}
10081008
}
10091009

1010-
object StaxXmlParser {
1010+
object StaxXmlParser extends Logging {
10111011
/**
10121012
* Parses a stream that contains CSV strings and turns it into an iterator of tokens.
10131013
*/
@@ -1050,14 +1050,16 @@ object StaxXmlParser {
10501050
}
10511051
}
10521052
} catch {
1053-
case e: Throwable =>
1053+
case e: FileNotFoundException if options.ignoreMissingFiles =>
1054+
logWarning("Skipped missing file", e)
1055+
Iterator.empty
1056+
case NonFatal(e) =>
10541057
SparkErrorUtils.getRootCause(e) match {
1055-
case _: IOException | _: RuntimeException | _: InternalError
1056-
if options.ignoreCorruptFiles =>
1057-
Iterator.empty
1058-
case _: FileNotFoundException if options.ignoreMissingFiles =>
1058+
case _: RuntimeException | _: IOException | _: InternalError
1059+
if options.ignoreCorruptFiles =>
1060+
logWarning("Skipped the rest of the content in the corrupted file", e)
10591061
Iterator.empty
1060-
case _ => throw e
1062+
case o => throw o
10611063
}
10621064
}
10631065

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -235,26 +235,12 @@ object MultiLineXmlDataSource extends XmlDataSource {
235235

236236
val xmlParserRdd: RDD[StaxXMLRecordReader] =
237237
xml.flatMap { portableDataStream =>
238-
try {
239-
val inputStream = () =>
240-
CodecStreams.createInputStreamWithCloseResource(
241-
portableDataStream.getConfiguration,
242-
new Path(portableDataStream.getPath())
243-
)
244-
StaxXmlParser.convertStream(inputStream, parsedOptions)(identity)
245-
} catch {
246-
case e: FileNotFoundException if parsedOptions.ignoreMissingFiles =>
247-
logWarning("Skipped missing file", e)
248-
None
249-
case NonFatal(e) =>
250-
Utils.getRootCause(e) match {
251-
case _: RuntimeException | _: IOException | _: InternalError
252-
if parsedOptions.ignoreCorruptFiles =>
253-
logWarning("Skipped the rest of the content in the corrupted file", e)
254-
None
255-
case o => throw o
256-
}
257-
}
238+
val inputStream = () =>
239+
CodecStreams.createInputStreamWithCloseResource(
240+
portableDataStream.getConfiguration,
241+
new Path(portableDataStream.getPath())
242+
)
243+
StaxXmlParser.convertStream(inputStream, parsedOptions)(identity)
258244
}
259245

260246
SQLExecution.withSQLConfPropagated(sparkSession) {

0 commit comments

Comments
 (0)