QingYingX的博客 In solitude, where we are least alone.

QingYingX / Flink 搭建教程

Created Tue, 01 Apr 2025 00:00:00 +0800 Modified Thu, 18 Dec 2025 09:58:26 +0000
1585 Words

Apache Flink 安装部署教程

1. 环境准备

1.1 系统要求

  • CentOS 7.9
  • Java 8或Java 11(推荐Java 8)
  • Hadoop环境(可选,如需集成Hadoop)
  • SSH免密登录(集群部署需要)

2. Flink单机版安装

# 创建安装目录
mkdir -p /root/software/flink
cd /root/software/flink

# 解压
tar -zxvf flink-1.14.4-bin-scala_2.12.tgz
mv flink-1.14.4 flink

2.2 配置环境变量

# 编辑环境变量
vim /etc/profile

# 添加以下内容
export FLINK_HOME=/root/software/flink/flink
export PATH=$FLINK_HOME/bin:$PATH

# 使配置生效
source /etc/profile
cd $FLINK_HOME/conf

# 编辑配置文件
vim flink-conf.yaml

基础配置示例:

# JobManager地址(单机为本机)
jobmanager.rpc.address: localhost

# JobManager RPC端口
jobmanager.rpc.port: 6123

# JobManager堆内存大小
jobmanager.memory.process.size: 1024m

# TaskManager堆内存大小
taskmanager.memory.process.size: 1024m

# 每个TaskManager的slot数量
taskmanager.numberOfTaskSlots: 1

# 并行度
parallelism.default: 1

# 解决类加载器问题(重要配置)
classloader.check-leaked-classloader: false
# 启动Flink集群
$FLINK_HOME/bin/start-cluster.sh

# 停止Flink集群
$FLINK_HOME/bin/stop-cluster.sh

2.5 验证安装

# 查看进程
jps
# 应该看到:StandaloneSessionClusterEntrypoint 和 TaskManagerRunner

# 访问Web UI
# 地址:http://localhost:8081

3. Flink集群部署

3.1 修改主机名和hosts文件

# 在所有节点上配置hosts
vim /etc/hosts

# 添加集群节点(示例)
192.168.1.101 master
192.168.1.102 slave1
192.168.1.103 slave2

3.2 配置SSH免密登录

# 在master节点生成密钥
ssh-keygen -t rsa

# 将公钥复制到所有节点
ssh-copy-id master
ssh-copy-id slave1
ssh-copy-id slave2

# 测试免密登录
ssh slave1 date

3.3 分发Flink安装包

# 在master节点执行
scp -r /root/software/flink/flink slave1:/root/software/flink/
scp -r /root/software/flink/flink slave2:/root/software/flink/

3.4 配置master节点

cd $FLINK_HOME/conf

# 编辑master文件
vim masters
# 添加内容:master:8081

# 编辑workers文件
vim workers
# 添加内容:
slave1
slave2

3.5 修改集群配置

vim flink-conf.yaml

集群配置示例:

# JobManager地址
jobmanager.rpc.address: master

# JobManager堆内存
jobmanager.memory.process.size: 2048m

# TaskManager堆内存
taskmanager.memory.process.size: 4096m

# 每个TaskManager的slot数量
taskmanager.numberOfTaskSlots: 4

# 并行度
parallelism.default: 4

# 解决类加载器问题
classloader.check-leaked-classloader: false

# 可选:设置检查点
state.checkpoints.dir: hdfs://master:9000/flink/checkpoints

3.6 启动Flink集群

# 在master节点启动集群
$FLINK_HOME/bin/start-cluster.sh

# 查看集群状态
$FLINK_HOME/bin/flink list

# 访问Web UI
# 地址:http://master:8081

4. 集成Hadoop环境

4.1 配置Hadoop Classpath

# 如果遇到以下错误:
# java.lang.IllegalStateException: No Executor found.
# Please make sure to export the HADOOP_CLASSPATH environment variable

# 解决方案:设置HADOOP_CLASSPATH环境变量
export HADOOP_CLASSPATH=`hadoop classpath`

# 永久生效,添加到环境变量
echo 'export HADOOP_CLASSPATH=`hadoop classpath`' >> /etc/profile
source /etc/profile

4.2 Flink连接Hadoop配置

# 下载对应版本的Flink-Hadoop连接器
cd $FLINK_HOME/lib

# 下载对应版本的hadoop连接器
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber

4.3 配置HDFS检查点

vim $FLINK_HOME/conf/flink-conf.yaml

添加HDFS配置:

# HDFS检查点目录
state.checkpoints.dir: hdfs://master:9000/flink/checkpoints

# 保存点目录
state.savepoints.dir: hdfs://master:9000/flink/savepoints

# 高可用存储目录
high-availability.storageDir: hdfs://master:9000/flink/ha

5. 运行测试作业

5.1 WordCount示例

# 启动Flink集群
$FLINK_HOME/bin/start-cluster.sh

# 提交WordCount示例作业
$FLINK_HOME/bin/flink run $FLINK_HOME/examples/batch/WordCount.jar \
  --input file:///opt/flink/README.txt \
  --output file:///opt/flink/result

# 查看结果
cat /opt/flink/result

5.2 Socket流处理示例

# 在一个终端启动netcat
nc -lk 9999

# 在另一个终端提交流处理作业
$FLINK_HOME/bin/flink run $FLINK_HOME/examples/streaming/SocketWindowWordCount.jar \
  --hostname localhost \
  --port 9999

6. 常见问题解决

6.1 类加载器泄漏警告

# 在flink-conf.yaml中添加
classloader.check-leaked-classloader: false
# 注意:false和冒号之间必须有空格

6.2 内存配置错误

# 调整内存配置
vim flink-conf.yaml
# 增加内存大小:
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 4096m

6.3 端口冲突

# 查看端口占用
netstat -tlnp | grep 8081

# 修改Web UI端口
vim flink-conf.yaml
# 添加:rest.port: 8082

6.4 Hadoop集成问题

# 确保Hadoop环境变量正确
echo $HADOOP_HOME
echo $HADOOP_CLASSPATH

# 检查Hadoop服务状态
hadoop version
hdfs dfsadmin -report

7. 基础管理命令

7.1 集群管理

# 启动集群
$FLINK_HOME/bin/start-cluster.sh

# 停止集群
$FLINK_HOME/bin/stop-cluster.sh

# 查看作业列表
$FLINK_HOME/bin/flink list

# 取消作业
$FLINK_HOME/bin/flink cancel <job-id>

7.2 作业提交

# 提交作业
$FLINK_HOME/bin/flink run <jar-file> [arguments]

# 带并行度提交
$FLINK_HOME/bin/flink run -p 4 <jar-file>

# 提交到特定JobManager
$FLINK_HOME/bin/flink run -m master:8081 <jar-file>

7.3 日志查看

# 查看JobManager日志
tail -f $FLINK_HOME/log/flink-*-standalonesession-*.log

# 查看TaskManager日志
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log

# 查看作业日志
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.out

8. 配置高可用(可选)

8.1 修改配置文件

vim $FLINK_HOME/conf/flink-conf.yaml

添加高可用配置:

# 启用高可用
high-availability: zookeeper
high-availability.storageDir: hdfs://master:9000/flink/ha
high-availability.zookeeper.quorum: master:2181,slave1:2181,slave2:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /default

8.2 配置多个JobManager

# 修改masters文件
vim $FLINK_HOME/conf/masters
# 添加多个JobManager
master:8081
slave1:8081

安装完成后,可以通过Web UI(http://localhost:8081)监控Flink集群状态,并使用命令行工具提交和管理作业。