Skip to content

Commit c62fa28

Browse files
authored
HIVE-29124: Avoid committing files when a task is aborted even though some source has completed (#6011)
1 parent 6c7e972 commit c62fa28

File tree

4 files changed

+39
-10
lines changed

4 files changed

+39
-10
lines changed

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,14 @@ public void abort() {
305305
} else {
306306
LOG.info("reducer not setup yet. abort not being forwarded");
307307
}
308+
if (mergeWorkList != null) {
309+
for (BaseWork redWork : mergeWorkList) {
310+
redWork.abort();
311+
}
312+
}
313+
if (reduceWork != null) {
314+
reduceWork.abort();
315+
}
308316
}
309317

310318
/**
@@ -343,16 +351,13 @@ void close() {
343351
}
344352

345353
try {
346-
if (isAborted()) {
347-
for (ReduceRecordSource rs : sources) {
348-
if (!rs.close()) {
349-
setAborted(false); // Preserving the old logic. Hmm...
350-
break;
351-
}
352-
}
353-
}
354-
355354
boolean abort = isAborted();
355+
for (ReduceRecordSource rs : sources) {
356+
abort |= !rs.close();
357+
}
358+
if (abort) {
359+
setAborted(true);
360+
}
356361
reducer.close(abort);
357362
if (mergeWorkList != null) {
358363
for (BaseWork redWork : mergeWorkList) {

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,14 @@ private void processVectorGroup(BytesWritable keyWritable,
520520
}
521521
}
522522

523+
524+
/**
525+
* Closes resources and returns whether the records were successfully processed.
526+
* @return boolean indicating the success status:
527+
* - true: All data has been processed successfully without exceptions.
528+
* - false: Exceptions were encountered during data processing.
529+
* @throws Exception unexpected errors occur during closing.
530+
*/
523531
boolean close() throws Exception {
524532
try {
525533
if (handleGroupKey && groupKey != null) {
@@ -533,7 +541,7 @@ boolean close() throws Exception {
533541
+ e.getMessage(), e);
534542
}
535543
}
536-
return abort;
544+
return !abort;
537545
}
538546

539547
public ObjectInspector getObjectInspector() {

ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,4 +545,12 @@ public void setInputSourceToRuntimeValuesInfo(
545545
String workName, RuntimeValuesInfo runtimeValuesInfo) {
546546
inputSourceToRuntimeValuesInfo.put(workName, runtimeValuesInfo);
547547
}
548+
549+
public void abort() {
550+
if (dummyOps != null) {
551+
for (Operator<?> dummyOp : dummyOps) {
552+
dummyOp.abort();
553+
}
554+
}
555+
}
548556
}

ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,4 +381,12 @@ public void setEdgePropRef(TezEdgeProperty edgeProp) {
381381
public TezEdgeProperty getEdgePropRef() {
382382
return edgeProp;
383383
}
384+
385+
@Override
386+
public void abort() {
387+
super.abort();
388+
if (reducer != null) {
389+
reducer.abort();
390+
}
391+
}
384392
}

0 commit comments

Comments
 (0)