From 102954bda118d4409226415db932ce49fa5f97a9 Mon Sep 17 00:00:00 2001 From: iijima-satoshi Date: Wed, 27 Apr 2022 04:12:37 +0900 Subject: [PATCH] FLUME-3320 Fix taildir source reset position bug --- .../taildir/ReliableTaildirEventReader.java | 21 ++++++++++++------- .../apache/flume/source/taildir/TailFile.java | 7 +++++++ 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java index ae9583620a..5dd33d5a87 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -39,6 +39,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; +import java.nio.file.attribute.FileTime; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -243,24 +244,29 @@ public List updateTailFiles(boolean skipToEnd) throws IOException { Map headers = headerTable.row(taildir.getFileGroup()); for (File f : taildir.getMatchingFiles()) { - long inode; + Map attrs; try { - inode = getInode(f); + attrs = getFileAttrs(f); } catch (NoSuchFileException e) { logger.info("File has been deleted in the meantime: " + e.getMessage()); continue; } + long inode = (long) attrs.get("ino"); + long fileSize = (long) attrs.get("size"); + long mtime = ((FileTime) attrs.get("lastModifiedTime")).toMillis(); + TailFile tf = tailFiles.get(inode); if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) { - long startPos = skipToEnd ? f.length() : 0; + long startPos = skipToEnd ? fileSize : 0; tf = openFile(f, headers, inode, startPos); } else { - boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length(); + boolean updated = tf.getLastUpdated() < mtime || tf.getPos() != fileSize; if (updated) { if (tf.getRaf() == null) { tf = openFile(f, headers, inode, tf.getPos()); } - if (f.length() < tf.getPos()) { + // The file was replaced or rewritten for some reason. + if (fileSize < tf.getPos()) { logger.info("Pos " + tf.getPos() + " is larger than file size! " + "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode); tf.updatePos(tf.getPath(), inode, 0); @@ -280,9 +286,8 @@ public List updateTailFiles() throws IOException { } - private long getInode(File file) throws IOException { - long inode = (long) Files.getAttribute(file.toPath(), "unix:ino"); - return inode; + private Map getFileAttrs(File file) throws IOException { + return Files.readAttributes(file.toPath(), "unix:ino,size,lastModifiedTime"); } private TailFile openFile(File file, Map headers, long inode, long pos) { diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java index 42474c4f75..9c0ab39251 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.file.Files; import java.util.List; import java.util.Map; @@ -69,6 +70,12 @@ public TailFile(File file, Map headers, long inode, long pos) this.headers = headers; this.oldBuffer = new byte[0]; this.bufferPos = NEED_READING; + + long newInode = (long) Files.getAttribute(file.toPath(), "unix:ino"); + if (inode != newInode) { + logger.info("Detected the inode change. The New inode file is tailed next time. " + + "file: " + path + ", inode: " + newInode); + } } public RandomAccessFile getRaf() {