摘要:的選哪個首選斷點還原可以記錄偏移量可配置文件組,里面使用正則表達式配置多個要監(jiān)控的文件就憑第一點其他的都被比下去了這么好的有一點不完美,不能支持遞歸監(jiān)控文件夾。
Flume的source選哪個?
taildir source首選!
1.斷點還原 positionFile可以記錄偏移量
2.可配置文件組,里面使用正則表達式配置多個要監(jiān)控的文件
就憑第一點其他的source都被比下去了!
這么好的taildir source有一點不完美,不能支持遞歸監(jiān)控文件夾。
所以就只能修改源代碼了……好玩,我喜歡~
Flume的taildir source啟動會調用start()方法作初始化,里面創(chuàng)建一個ReliableTaildirEventReader,這里用到了建造者模式
@Override public synchronized void start() { logger.info("{} TaildirSource source starting with directory: {}", getName(), filePaths); try { reader = new ReliableTaildirEventReader.Builder() .filePaths(filePaths) .headerTable(headerTable) .positionFilePath(positionFilePath) .skipToEnd(skipToEnd) .addByteOffset(byteOffsetHeader) .cachePatternMatching(cachePatternMatching) .recursive(isRecursive) .annotateFileName(fileHeader) .fileNameHeader(fileHeaderKey) .build(); } catch (IOException e) { throw new FlumeException("Error instantiating ReliableTaildirEventReader", e); } idleFileChecker = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build()); idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(), idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS); positionWriter = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("positionWriter").build()); positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(), writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS); super.start(); logger.debug("TaildirSource started"); sourceCounter.start(); }
taildir source屬于PollableSource,
/** * A {@link Source} that requires an external driver to poll to determine * whether there are {@linkplain Event events} that are available to ingest * from the source. * * @see org.apache.flume.source.EventDrivenSourceRunner */ public interface PollableSource extends Source { ...
這段注釋的意思是PollableSource是需要一個外部驅動去查看有沒有需要消費的事件,從而拉取事件,講白了就是定時拉取。所以flume也不一定是真正實時的,只是隔一會兒不停地來查看事件而已。(與之相應的是另一種EventDrivenSourceRunner)
那么taildir source在定時拉取事件的時候是調用的process方法
@Override public Status process() { Status status = Status.READY; try { existingInodes.clear(); existingInodes.addAll(reader.updateTailFiles()); for (long inode : existingInodes) { TailFile tf = reader.getTailFiles().get(inode); if (tf.needTail()) { tailFileProcess(tf, true); } } closeTailFiles(); try { TimeUnit.MILLISECONDS.sleep(retryInterval); } catch (InterruptedException e) { logger.info("Interrupted while sleeping"); } } catch (Throwable t) { logger.error("Unable to tail files", t); status = Status.BACKOFF; } return status; }
重點就是下面這幾行
existingInodes.addAll(reader.updateTailFiles());
for (long inode : existingInodes) {
TailFile tf = reader.getTailFiles().get(inode);
if (tf.needTail()) {
tailFileProcess(tf, true);
} }
從reader.updateTailFiles()獲取需要監(jiān)控的文件,然后對每一個進行處理,查看最后修改時間,判定是否需要tail,需要tail就tail
那么進入reader.updateTailFiles()
for (TaildirMatcher taildir : taildirCache) { Mapheaders = headerTable.row(taildir.getFileGroup()); for (File f : taildir.getMatchingFiles()) { long inode = getInode(f); TailFile tf = tailFiles.get(inode); if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) { long startPos = skipToEnd ? f.length() : 0; tf = openFile(f, headers, inode, startPos);
遍歷每一個正則表達式匹配對應的匹配器,每個匹配器去獲取匹配的文件!taildir.getMatchingFiles()
ListgetMatchingFiles() { long now = TimeUnit.SECONDS.toMillis( TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())); long currentParentDirMTime = parentDir.lastModified(); List result; // calculate matched files if // - we don"t want to use cache (recalculate every time) OR // - directory was clearly updated after the last check OR // - last mtime change wasn"t already checked for sure // (system clock hasn"t passed that second yet) if (!cachePatternMatching || lastSeenParentDirMTime < currentParentDirMTime || !(currentParentDirMTime < lastCheckedTime)) { lastMatchedFiles = sortByLastModifiedTime(getMatchingFilesNoCache(isRecursive)); lastSeenParentDirMTime = currentParentDirMTime; lastCheckedTime = now; } return lastMatchedFiles; }
可以看到getMatchingFilesNoCache(isRecursive)就是獲取匹配的文件的方法,也就是需要修改的方法了!
ps:這里的isRecursive是我加的~
點進去:
private ListgetMatchingFilesNoCache() { List result = Lists.newArrayList(); try (DirectoryStream stream = Files.newDirectoryStream(parentDir.toPath(), fileFilter)) { for (Path entry : stream) { result.add(entry.toFile()); } } catch (IOException e) { logger.error("I/O exception occurred while listing parent directory. " + "Files already matched will be returned. " + parentDir.toPath(), e); } return result; }
源碼是用了Files.newDirectoryStream(parentDir.toPath(), fileFilter)),將父目錄下符合正則表達式的文件都添加到一個迭代器里。(這里還用了try (...)的語法糖)
找到地方了,開始改!我在這個getMatchingFilesNoCache()方法下面下了一個重載的方法, 可增加擴展性:
private ListgetMatchingFilesNoCache(boolean recursion) { if (!recursion) { return getMatchingFilesNoCache(); } List result = Lists.newArrayList(); // 使用非遞歸的方式遍歷文件夾 Queue dirs = new ArrayBlockingQueue<>(10); dirs.offer(parentDir); while (dirs.size() > 0) { File dir = dirs.poll(); try { DirectoryStream stream = Files.newDirectoryStream(dir.toPath(), fileFilter); stream.forEach(path -> result.add(path.toFile())); } catch (IOException e) { logger.error("I/O exception occurred while listing parent directory. " + "Files already matched will be returned. (recursion)" + parentDir.toPath(), e); } File[] dirList = dir.listFiles(); assert dirList != null; for (File f : dirList) { if (f.isDirectory()) { dirs.add(f); } } } return result; }
我使用了非遞歸的方式遍歷文件夾,就是樹到隊列的轉換。
到這里,核心部分就改完了。接下來要處理這個recursion的參數(shù)
一路改構造方法,添加這個參數(shù),最終參數(shù)從哪來呢?
flume的source啟動時會調用configure方法,將Context中的內容配置進reader等對象中。
isRecursive = context.getBoolean(RECURSIVE, DEFAULT_RECURSIVE);
context從TaildirSourceConfigurationConstants中獲取配置名和默認值
/** * Whether to support recursion. */ public static final String RECURSIVE = "recursive"; public static final boolean DEFAULT_RECURSIVE = false;
這里的recursive也就是flume配置文件里配置項了
# Whether to support recusion a1.sources.r1.recursive = true大功告成,打包試試!
用maven只對這一個module打包。我把這個module的pom改了下artifactId,加上了自己名字作個紀念,哈哈
可惜pom里面不能寫中文……
org.apache.flume.flume-ng-sources flume-taildir-source-recursive-by-Wish000 Flume Taildir Source
執(zhí)行package將其放在flume的lib下,替換原來的flume-taildir-source***.jar
啟動,測試,成功!
具體代碼見GitHub地址:https://github.com/Wish000/me...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77865.html
摘要:的選哪個首選斷點還原可以記錄偏移量可配置文件組,里面使用正則表達式配置多個要監(jiān)控的文件就憑第一點其他的都被比下去了這么好的有一點不完美,不能支持遞歸監(jiān)控文件夾。 Flume的source選哪個?taildir source首選!1.斷點還原 positionFile可以記錄偏移量2.可配置文件組,里面使用正則表達式配置多個要監(jiān)控的文件就憑第一點其他的source都被比下去了!這么好的t...
摘要:于是便誕生了隨行付分布式文件系統(tǒng)簡稱,提供的海量安全低成本高可靠的云存儲服務。子系統(tǒng)相關流程圖如下核心實現(xiàn)主要為隨行付各個業(yè)務系統(tǒng)提供文件共享和訪問服務,并且可以按應用統(tǒng)計流量命中率空間等指標。 背景 傳統(tǒng)Web應用中所有的功能部署在一起,圖片、文件也在一臺服務器;應用微服務架構后,服務之間的圖片共享通過FTP+Nginx靜態(tài)資源的方式進行訪問,文件共享通過nfs磁盤掛載的方式進行訪問...
摘要:對于一般的采集需求,通過對的簡單配置即可實現(xiàn)。針對特殊場景也具備良好的自定義擴展能力,因此,可以適用于大部分的日常數(shù)據(jù)采集場景。 文章作者:foochane? 原文鏈接:https://foochane.cn/article/2019062701.html Flume日志采集框架 安裝和部署 Flume運行機制 采集靜態(tài)文件到hdfs 采集動態(tài)日志文件到hdfs 兩個agent級聯(lián) F...
閱讀 1711·2021-11-24 09:39
閱讀 2494·2021-11-18 10:07
閱讀 3677·2021-08-31 09:40
閱讀 3349·2019-08-30 15:44
閱讀 2641·2019-08-30 12:50
閱讀 3662·2019-08-26 17:04
閱讀 1438·2019-08-26 13:49
閱讀 1275·2019-08-23 18:05