QingYingX的博客 In solitude, where we are least alone.

QingYingX / Flume 搭建教程

Created Tue, 01 Apr 2025 00:00:00 +0800 Modified Thu, 18 Dec 2025 10:15:53 +0000
1918 Words

Apache Flume 安装配置教程

1. Flume 简介

Apache Flume 是一个分布式、可靠、高可用的海量日志采集、聚合和传输系统。它可以高效地收集、聚合和移动大量日志数据,具有基于流式数据的简单灵活架构。

2. 环境准备

2.1 系统要求

  • CentOS 7.x
  • Java 1.8 或更高版本
  • Hadoop(可选,如需HDFS输出)

3. Flume 安装部署

3.1 下载和解压

# 创建安装目录
mkdir -p /root/software
cd /root/software

# 解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz
mv apache-flume-1.9.0-bin flume

3.2 配置环境变量

# 编辑环境变量
vim /etc/profile

# 添加以下内容
export FLUME_HOME=/root/software/flume
export PATH=$FLUME_HOME/bin:$PATH

# 使配置生效
source /etc/profile

# 验证安装
flume-ng version

3.3 基础配置

# 进入Flume配置目录
cd $FLUME_HOME/conf

# 创建环境变量配置文件
cp flume-env.sh.template flume-env.sh
vim flume-env.sh

# 添加Java环境配置
export JAVA_HOME=/root/software/jdk1.8

4. Flume 配置说明

4.1 配置文件结构

Flume配置文件包含三个主要组件:

  • Source:数据源,定义数据从哪里来
  • Channel:通道,作为Source和Sink之间的缓冲区
  • Sink:数据汇,定义数据到哪里去

4.2 基础配置模板

# 定义Agent名称
[agent名称].sources = [source名称]
[agent名称].channels = [channel名称]
[agent名称].sinks = [sink名称]

# 配置Source
[agent名称].sources.[source名称].type = [source类型]
[agent名称].sources.[source名称].[其他参数] = [参数值]

# 配置Channel
[agent名称].channels.[channel名称].type = [channel类型]
[agent名称].channels.[channel名称].[其他参数] = [参数值]

# 配置Sink
[agent名称].sinks.[sink名称].type = [sink类型]
[agent名称].sinks.[sink名称].[其他参数] = [参数值]

# 绑定关系
[agent名称].sources.[source名称].channels = [channel名称]
[agent名称].sinks.[sink名称].channel = [channel名称]

5. 监控文件夹变化(SpoolDir Source)

5.1 创建配置文件

# 在Flume配置目录创建hdfs.conf
cd $FLUME_HOME/conf
vim hdfs.conf

5.2 SpoolDir配置示例

# 定义Agent组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 配置Source - SpoolDir类型
# 监控指定目录下的文件变化,读取后标记为完成
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/server/hadoop-3.1.3/logs/
a1.sources.r1.fileHeader = true
a1.sources.r1.fileHeaderKey = filename
a1.sources.r1.basenameHeader = true
a1.sources.r1.basenameHeaderKey = basename

# 配置Sink - HDFS类型
# 将数据写入HDFS
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/tmp/flume
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.rollInterval = 900
a1.sinks.k1.hdfs.rollSize = 1048760
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text

# 配置Channel - File类型
# 使用文件通道,数据可靠性更高
a1.channels.c1.type = file
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.checkpointDir = /tmp/flume/checkpoint
a1.channels.c1.dataDirs = /tmp/flume/data

# 绑定组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5.3 配置说明

参数 说明 推荐值
spoolDir 监控的目录路径 根据实际情况设置
fileHeader 是否添加文件头信息 true
hdfs.path HDFS存储路径 hdfs://[namenode]:port/path
rollInterval 文件滚动时间间隔(秒) 900
rollSize 文件滚动大小(字节) 1048760(约1MB)
rollCount 文件滚动事件数 0(不限制)
capacity 通道容量 1000-10000
transactionCapacity 事务容量 100-1000

5.4 准备工作

# 创建监控目录
mkdir -p /opt/server/hadoop-3.1.3/logs/

# 为HDFS目录授权
hdfs dfs -mkdir -p /tmp/flume
hdfs dfs -chmod -R 755 /tmp

# 创建本地文件通道目录
mkdir -p /tmp/flume/{checkpoint,data}

5.5 启动Flume Agent

# 启动命令
flume-ng agent \
  --conf $FLUME_HOME/conf \
  --conf-file $FLUME_HOME/conf/hdfs.conf \
  --name a1 \
  -Dflume.root.logger=INFO,console

# 或简写形式
flume-ng agent -c conf -f hdfs.conf -n a1 -Dflume.root.logger=INFO,console

5.6 测试SpoolDir

# 1. 创建测试文件到监控目录
echo "test data 1" > /opt/server/hadoop-3.1.3/logs/test1.log
echo "test data 2" > /opt/server/hadoop-3.1.3/logs/test2.log

# 2. 查看Flume控制台输出

# 3. 检查HDFS上的数据
hdfs dfs -ls /tmp/flume
hdfs dfs -cat /tmp/flume/events-*.log

6. 监控文件内容变化(Exec Source)

6.1 创建配置文件

cd $FLUME_HOME/conf
vim exec.conf

6.2 Exec配置示例

# 定义Agent组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 配置Source - Exec类型
# 执行命令并监控输出
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/server/hadoop-3.1.3/logs/hadoop-root-datanode-master.log
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.restart = true
a1.sources.r1.restartThrottle = 10000
a1.sources.r1.logStdErr = false

# 配置Sink - Logger类型(控制台输出)
a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 16384

# 配置Channel - Memory类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 绑定组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

6.3 启动Exec Agent

# 启动Exec监控
flume-ng agent \
  --conf $FLUME_HOME/conf \
  --conf-file $FLUME_HOME/conf/exec.conf \
  --name a1 \
  -Dflume.root.logger=INFO,console

# 测试:向监控文件写入数据
echo "new log entry at $(date)" >> /opt/server/hadoop-3.1.3/logs/hadoop-root-datanode-master.log

7. 进阶配置示例

7.1 多Sink负载均衡

# 定义两个Sink进行负载均衡
a1.sinks = k1 k2
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin

7.2 数据拦截器

# 添加时间戳拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = false

8. 管理命令

8.1 启动命令

# 前台启动(调试模式)
flume-ng agent -c conf -f conf/hdfs.conf -n a1 -Dflume.root.logger=INFO,console

# 后台启动
nohup flume-ng agent -c conf -f conf/hdfs.conf -n a1 > flume.log 2>&1 &

8.2 监控命令

# 查看进程
ps aux | grep flume

# 查看日志
tail -f flume.log
tail -f $FLUME_HOME/logs/flume.log

# 停止进程
pkill -f "flume-ng agent"

9. 常见问题解决

9.1 Java环境问题

# 检查JAVA_HOME设置
echo $JAVA_HOME

# 修改flume-ng脚本(如必要)
vim $FLUME_HOME/bin/flume-ng
# 在第110行附近添加(如果缺少hbase库路径)
java.library.path 2>/dev/null | grep hbase

9.2 HDFS权限问题

# 确保HDFS目录有写入权限
hdfs dfs -chmod -R 777 /tmp/flume

# 或设置HDFS Sink的用户
a1.sinks.k1.hdfs.proxyUser = flumeuser

9.3 文件通道问题

# 清理旧的检查点和数据文件
rm -rf /tmp/flume/checkpoint/*
rm -rf /tmp/flume/data/*

9.4 内存不足问题

# 增加JVM内存
export JAVA_OPTS="-Xms512m -Xmx1024m -Dcom.sun.management.jmxremote"

# 或在启动时指定
flume-ng agent ... -Dflume.root.logger=INFO,console -Xmx1024m