Apache Flume 安装配置教程
1. Flume 简介
Apache Flume 是一个分布式、可靠、高可用的海量日志采集、聚合和传输系统。它可以高效地收集、聚合和移动大量日志数据,具有基于流式数据的简单灵活架构。
2. 环境准备
2.1 系统要求
- CentOS 7.x
- Java 1.8 或更高版本
- Hadoop(可选,如需HDFS输出)
3. Flume 安装部署
3.1 下载和解压
# 创建安装目录
mkdir -p /root/software
cd /root/software
# 解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz
mv apache-flume-1.9.0-bin flume
3.2 配置环境变量
# 编辑环境变量
vim /etc/profile
# 添加以下内容
export FLUME_HOME=/root/software/flume
export PATH=$FLUME_HOME/bin:$PATH
# 使配置生效
source /etc/profile
# 验证安装
flume-ng version
3.3 基础配置
# 进入Flume配置目录
cd $FLUME_HOME/conf
# 创建环境变量配置文件
cp flume-env.sh.template flume-env.sh
vim flume-env.sh
# 添加Java环境配置
export JAVA_HOME=/root/software/jdk1.8
4. Flume 配置说明
4.1 配置文件结构
Flume配置文件包含三个主要组件:
- Source:数据源,定义数据从哪里来
- Channel:通道,作为Source和Sink之间的缓冲区
- Sink:数据汇,定义数据到哪里去
4.2 基础配置模板
# 定义Agent名称
[agent名称].sources = [source名称]
[agent名称].channels = [channel名称]
[agent名称].sinks = [sink名称]
# 配置Source
[agent名称].sources.[source名称].type = [source类型]
[agent名称].sources.[source名称].[其他参数] = [参数值]
# 配置Channel
[agent名称].channels.[channel名称].type = [channel类型]
[agent名称].channels.[channel名称].[其他参数] = [参数值]
# 配置Sink
[agent名称].sinks.[sink名称].type = [sink类型]
[agent名称].sinks.[sink名称].[其他参数] = [参数值]
# 绑定关系
[agent名称].sources.[source名称].channels = [channel名称]
[agent名称].sinks.[sink名称].channel = [channel名称]
5. 监控文件夹变化(SpoolDir Source)
5.1 创建配置文件
# 在Flume配置目录创建hdfs.conf
cd $FLUME_HOME/conf
vim hdfs.conf
5.2 SpoolDir配置示例
# 定义Agent组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 配置Source - SpoolDir类型
# 监控指定目录下的文件变化,读取后标记为完成
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/server/hadoop-3.1.3/logs/
a1.sources.r1.fileHeader = true
a1.sources.r1.fileHeaderKey = filename
a1.sources.r1.basenameHeader = true
a1.sources.r1.basenameHeaderKey = basename
# 配置Sink - HDFS类型
# 将数据写入HDFS
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/tmp/flume
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.rollInterval = 900
a1.sinks.k1.hdfs.rollSize = 1048760
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
# 配置Channel - File类型
# 使用文件通道,数据可靠性更高
a1.channels.c1.type = file
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.checkpointDir = /tmp/flume/checkpoint
a1.channels.c1.dataDirs = /tmp/flume/data
# 绑定组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5.3 配置说明
| 参数 | 说明 | 推荐值 |
|---|---|---|
| spoolDir | 监控的目录路径 | 根据实际情况设置 |
| fileHeader | 是否添加文件头信息 | true |
| hdfs.path | HDFS存储路径 | hdfs://[namenode]:port/path |
| rollInterval | 文件滚动时间间隔(秒) | 900 |
| rollSize | 文件滚动大小(字节) | 1048760(约1MB) |
| rollCount | 文件滚动事件数 | 0(不限制) |
| capacity | 通道容量 | 1000-10000 |
| transactionCapacity | 事务容量 | 100-1000 |
5.4 准备工作
# 创建监控目录
mkdir -p /opt/server/hadoop-3.1.3/logs/
# 为HDFS目录授权
hdfs dfs -mkdir -p /tmp/flume
hdfs dfs -chmod -R 755 /tmp
# 创建本地文件通道目录
mkdir -p /tmp/flume/{checkpoint,data}
5.5 启动Flume Agent
# 启动命令
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/hdfs.conf \
--name a1 \
-Dflume.root.logger=INFO,console
# 或简写形式
flume-ng agent -c conf -f hdfs.conf -n a1 -Dflume.root.logger=INFO,console
5.6 测试SpoolDir
# 1. 创建测试文件到监控目录
echo "test data 1" > /opt/server/hadoop-3.1.3/logs/test1.log
echo "test data 2" > /opt/server/hadoop-3.1.3/logs/test2.log
# 2. 查看Flume控制台输出
# 3. 检查HDFS上的数据
hdfs dfs -ls /tmp/flume
hdfs dfs -cat /tmp/flume/events-*.log
6. 监控文件内容变化(Exec Source)
6.1 创建配置文件
cd $FLUME_HOME/conf
vim exec.conf
6.2 Exec配置示例
# 定义Agent组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 配置Source - Exec类型
# 执行命令并监控输出
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/server/hadoop-3.1.3/logs/hadoop-root-datanode-master.log
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.restart = true
a1.sources.r1.restartThrottle = 10000
a1.sources.r1.logStdErr = false
# 配置Sink - Logger类型(控制台输出)
a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 16384
# 配置Channel - Memory类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 绑定组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
6.3 启动Exec Agent
# 启动Exec监控
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/exec.conf \
--name a1 \
-Dflume.root.logger=INFO,console
# 测试:向监控文件写入数据
echo "new log entry at $(date)" >> /opt/server/hadoop-3.1.3/logs/hadoop-root-datanode-master.log
7. 进阶配置示例
7.1 多Sink负载均衡
# 定义两个Sink进行负载均衡
a1.sinks = k1 k2
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
7.2 数据拦截器
# 添加时间戳拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = false
8. 管理命令
8.1 启动命令
# 前台启动(调试模式)
flume-ng agent -c conf -f conf/hdfs.conf -n a1 -Dflume.root.logger=INFO,console
# 后台启动
nohup flume-ng agent -c conf -f conf/hdfs.conf -n a1 > flume.log 2>&1 &
8.2 监控命令
# 查看进程
ps aux | grep flume
# 查看日志
tail -f flume.log
tail -f $FLUME_HOME/logs/flume.log
# 停止进程
pkill -f "flume-ng agent"
9. 常见问题解决
9.1 Java环境问题
# 检查JAVA_HOME设置
echo $JAVA_HOME
# 修改flume-ng脚本(如必要)
vim $FLUME_HOME/bin/flume-ng
# 在第110行附近添加(如果缺少hbase库路径)
java.library.path 2>/dev/null | grep hbase
9.2 HDFS权限问题
# 确保HDFS目录有写入权限
hdfs dfs -chmod -R 777 /tmp/flume
# 或设置HDFS Sink的用户
a1.sinks.k1.hdfs.proxyUser = flumeuser
9.3 文件通道问题
# 清理旧的检查点和数据文件
rm -rf /tmp/flume/checkpoint/*
rm -rf /tmp/flume/data/*
9.4 内存不足问题
# 增加JVM内存
export JAVA_OPTS="-Xms512m -Xmx1024m -Dcom.sun.management.jmxremote"
# 或在启动时指定
flume-ng agent ... -Dflume.root.logger=INFO,console -Xmx1024m