Flink

flink HA部署

flink搭建,采用分布式部署方式,分别为A,B,C三个节点。其中A为master;A,B,C为worker。 本文使用的用户是hadoop用户(自己新建)

  • 先决条件
  1. Java 1.8.x or higher
  2. scala 自己使用的是2.11.4
  3. 节点之间做ssh免密
  4. 搭建hadoop集群及启动hdfs及yarn(2.8.4)
  5. zk集群是在搭建hadoop集群时已经搭建好。
  • 下载flink,本文采用的是flink1.6.4-hadoop2.8版本,下载地址
  1. 解压并软链flink-1.6.4-bin-hadoop28-scala_2.11.tgz
tar -zxvf flink-1.6.4-bin-hadoop28-scala_2.11.tgz
ln -s flink-1.6.4/ flink
  1. 修改flink-conf.yaml文件。具体如下所示:
//此项修改为每台节点各自的hostname
jobmanager.rpc.address: bigdata-01
jobmanager.rpc.port: 6123
# The heap size for the JobManager JVM
jobmanager.heap.size: 2048m
# The heap size for the TaskManager JVM
taskmanager.heap.size: 2048m
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1
#==============================================================================
# High Availability 高可用必须按如下所示配置
#high-availability: 高可用模式,必须为zookeeper
#high-availability.storageDir: JobManager的元数据持久化保存的位置
#high-availability.zookeeper.quorum: zk集群地址
#==============================================================================

# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
#
high-availability: zookeeper
# The path where metadata for master recovery is persisted. While ZooKeeper stores
# the small ground truth for checkpoint and leader election, this location stores
# the larger objects, like persisted dataflow graphs.
# 
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...) 
#
high-availability.storageDir: hdfs://A:9000/user/flink/ha/
#

# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
#
high-availability.zookeeper.quorum: A:2181,B:2181,C:2181
# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
# The default value is "open" and it can be changed to "creator" if ZK security is enabled
#
# high-availability.zookeeper.client.acl: open

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: hdfs://A:9000/user/flink/flink-checkpoints
# Default target directory for savepoints, optional.
#
state.savepoints.dir: hdfs://A:9000/user/flink/flink-savepoints
# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend). 
#
# state.backend.incremental: false
  1. 然后修改后masters文件将A写进去。同时修改slaves文件,将ABC三个都写进去。
  2. 将修改后的flink-1.6.4 scp到B和C机器上。
  3. 在master节点的bin目录下执行./start-cluster.sh即可,然后就可以在master上看到两个jvm 进程:StandaloneSessionClusterEntrypoint和TaskManagerRunner
  4. 在浏览其上输入http://A:8081即可出现fink管理界面

⚠️注意:

  1. 在flink-conf.yaml文件中obmanager.rpc.address这个值按照所在节点的hostname改。

  2. flink-conf.yaml文件中没有写出来的我选择的默认,其中涉及hdfs路径的需要提前建好。

flink踩坑

Container is running beyond virtual memory limits. Current usage: 119.5 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container

解决办法: 主要参考博文,这篇文章解释的十分清楚。

flink练习demo

《Flink官方文档》Batch Examples

    原文作者:人工智能
    原文地址: https://my.oschina.net/112612/blog/3038848#comments
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞