Apache Flink 安装部署教程
1. 环境准备
1.1 系统要求
- CentOS 7.9
- Java 8或Java 11(推荐Java 8)
- Hadoop环境(可选,如需集成Hadoop)
- SSH免密登录(集群部署需要)
2. Flink单机版安装
2.1 下载Flink
# 创建安装目录
mkdir -p /root/software/flink
cd /root/software/flink
# 解压
tar -zxvf flink-1.14.4-bin-scala_2.12.tgz
mv flink-1.14.4 flink
2.2 配置环境变量
# 编辑环境变量
vim /etc/profile
# 添加以下内容
export FLINK_HOME=/root/software/flink/flink
export PATH=$FLINK_HOME/bin:$PATH
# 使配置生效
source /etc/profile
2.3 配置Flink
cd $FLINK_HOME/conf
# 编辑配置文件
vim flink-conf.yaml
基础配置示例:
# JobManager地址(单机为本机)
jobmanager.rpc.address: localhost
# JobManager RPC端口
jobmanager.rpc.port: 6123
# JobManager堆内存大小
jobmanager.memory.process.size: 1024m
# TaskManager堆内存大小
taskmanager.memory.process.size: 1024m
# 每个TaskManager的slot数量
taskmanager.numberOfTaskSlots: 1
# 并行度
parallelism.default: 1
# 解决类加载器问题(重要配置)
classloader.check-leaked-classloader: false
2.4 启动Flink
# 启动Flink集群
$FLINK_HOME/bin/start-cluster.sh
# 停止Flink集群
$FLINK_HOME/bin/stop-cluster.sh
2.5 验证安装
# 查看进程
jps
# 应该看到:StandaloneSessionClusterEntrypoint 和 TaskManagerRunner
# 访问Web UI
# 地址:http://localhost:8081
3. Flink集群部署
3.1 修改主机名和hosts文件
# 在所有节点上配置hosts
vim /etc/hosts
# 添加集群节点(示例)
192.168.1.101 master
192.168.1.102 slave1
192.168.1.103 slave2
3.2 配置SSH免密登录
# 在master节点生成密钥
ssh-keygen -t rsa
# 将公钥复制到所有节点
ssh-copy-id master
ssh-copy-id slave1
ssh-copy-id slave2
# 测试免密登录
ssh slave1 date
3.3 分发Flink安装包
# 在master节点执行
scp -r /root/software/flink/flink slave1:/root/software/flink/
scp -r /root/software/flink/flink slave2:/root/software/flink/
3.4 配置master节点
cd $FLINK_HOME/conf
# 编辑master文件
vim masters
# 添加内容:master:8081
# 编辑workers文件
vim workers
# 添加内容:
slave1
slave2
3.5 修改集群配置
vim flink-conf.yaml
集群配置示例:
# JobManager地址
jobmanager.rpc.address: master
# JobManager堆内存
jobmanager.memory.process.size: 2048m
# TaskManager堆内存
taskmanager.memory.process.size: 4096m
# 每个TaskManager的slot数量
taskmanager.numberOfTaskSlots: 4
# 并行度
parallelism.default: 4
# 解决类加载器问题
classloader.check-leaked-classloader: false
# 可选:设置检查点
state.checkpoints.dir: hdfs://master:9000/flink/checkpoints
3.6 启动Flink集群
# 在master节点启动集群
$FLINK_HOME/bin/start-cluster.sh
# 查看集群状态
$FLINK_HOME/bin/flink list
# 访问Web UI
# 地址:http://master:8081
4. 集成Hadoop环境
4.1 配置Hadoop Classpath
# 如果遇到以下错误:
# java.lang.IllegalStateException: No Executor found.
# Please make sure to export the HADOOP_CLASSPATH environment variable
# 解决方案:设置HADOOP_CLASSPATH环境变量
export HADOOP_CLASSPATH=`hadoop classpath`
# 永久生效,添加到环境变量
echo 'export HADOOP_CLASSPATH=`hadoop classpath`' >> /etc/profile
source /etc/profile
4.2 Flink连接Hadoop配置
# 下载对应版本的Flink-Hadoop连接器
cd $FLINK_HOME/lib
# 下载对应版本的hadoop连接器
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber
4.3 配置HDFS检查点
vim $FLINK_HOME/conf/flink-conf.yaml
添加HDFS配置:
# HDFS检查点目录
state.checkpoints.dir: hdfs://master:9000/flink/checkpoints
# 保存点目录
state.savepoints.dir: hdfs://master:9000/flink/savepoints
# 高可用存储目录
high-availability.storageDir: hdfs://master:9000/flink/ha
5. 运行测试作业
5.1 WordCount示例
# 启动Flink集群
$FLINK_HOME/bin/start-cluster.sh
# 提交WordCount示例作业
$FLINK_HOME/bin/flink run $FLINK_HOME/examples/batch/WordCount.jar \
--input file:///opt/flink/README.txt \
--output file:///opt/flink/result
# 查看结果
cat /opt/flink/result
5.2 Socket流处理示例
# 在一个终端启动netcat
nc -lk 9999
# 在另一个终端提交流处理作业
$FLINK_HOME/bin/flink run $FLINK_HOME/examples/streaming/SocketWindowWordCount.jar \
--hostname localhost \
--port 9999
6. 常见问题解决
6.1 类加载器泄漏警告
# 在flink-conf.yaml中添加
classloader.check-leaked-classloader: false
# 注意:false和冒号之间必须有空格
6.2 内存配置错误
# 调整内存配置
vim flink-conf.yaml
# 增加内存大小:
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 4096m
6.3 端口冲突
# 查看端口占用
netstat -tlnp | grep 8081
# 修改Web UI端口
vim flink-conf.yaml
# 添加:rest.port: 8082
6.4 Hadoop集成问题
# 确保Hadoop环境变量正确
echo $HADOOP_HOME
echo $HADOOP_CLASSPATH
# 检查Hadoop服务状态
hadoop version
hdfs dfsadmin -report
7. 基础管理命令
7.1 集群管理
# 启动集群
$FLINK_HOME/bin/start-cluster.sh
# 停止集群
$FLINK_HOME/bin/stop-cluster.sh
# 查看作业列表
$FLINK_HOME/bin/flink list
# 取消作业
$FLINK_HOME/bin/flink cancel <job-id>
7.2 作业提交
# 提交作业
$FLINK_HOME/bin/flink run <jar-file> [arguments]
# 带并行度提交
$FLINK_HOME/bin/flink run -p 4 <jar-file>
# 提交到特定JobManager
$FLINK_HOME/bin/flink run -m master:8081 <jar-file>
7.3 日志查看
# 查看JobManager日志
tail -f $FLINK_HOME/log/flink-*-standalonesession-*.log
# 查看TaskManager日志
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log
# 查看作业日志
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.out
8. 配置高可用(可选)
8.1 修改配置文件
vim $FLINK_HOME/conf/flink-conf.yaml
添加高可用配置:
# 启用高可用
high-availability: zookeeper
high-availability.storageDir: hdfs://master:9000/flink/ha
high-availability.zookeeper.quorum: master:2181,slave1:2181,slave2:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /default
8.2 配置多个JobManager
# 修改masters文件
vim $FLINK_HOME/conf/masters
# 添加多个JobManager
master:8081
slave1:8081
安装完成后,可以通过Web UI(http://localhost:8081)监控Flink集群状态,并使用命令行工具提交和管理作业。