Flink on yarn运行python程序

Flink on yarn运行python程序

Flink本地local模式运行python程序

参考官网写了python程序Adder.py(后面贴上代码),在本地local模式用./bin/pyflink.sh ./Adder.p启动是可以的

➜  flink-1.7.1 ./bin/pyflink.sh ./Adder.py
Starting execution of program
Program execution finished
Job with JobID 8c7ab33a382c279b66089d43693fde52 has finished.
Job Runtime: 679 ms

jobManager web ui上查看是有JobID 为8c7ab33a382c279b66089d43693fde52的任务的。

《Flink on yarn运行python程序》

可以看到本地local模式是可以运行python程序的。

Flink on yarn运行python程序

在服务器6.34上执行(6.34上有单机的flink客户端,可以连接yarn,向yarn提交flink程序)
iknow@search-uda-6-34:~/wangbin/flink-1.7.2 $ ./bin/pyflink.sh ./Adder.py

《Flink on yarn运行python程序》

默认会起动到application_1550579025929_61820上,指定了yid参数也会默认跑在application_1550579025929_61820上

决定换一台机器试试,到0-107上,这里省略拷贝hadoop和flink环境的过程,
启动一个yarn session:

./bin/yarn-session.sh -qu core -nm yarn-session-core -n 2 -jm 1024m -tm 2096m

《Flink on yarn运行python程序》

启动日志如下:

iknow@search-uda-0-107:~/wangbin/flink/flink-1.7.2 $ ./bin/yarn-session.sh -qu core -nm yarn-session-core -n 2 -jm 1024m -tm 2096m
2019-03-01 19:53:31,148 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, localhost
2019-03-01 19:53:31,150 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2019-03-01 19:53:31,151 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2019-03-01 19:53:31,151 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2019-03-01 19:53:31,151 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2019-03-01 19:53:31,152 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2019-03-01 19:53:31,153 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2019-03-01 19:53:31,815 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-03-01 19:53:31,978 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/core-site.xml:an attempt to override final parameter: fs.defaultFS;  Ignoring.
2019-03-01 19:53:31,980 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir;  Ignoring.
2019-03-01 19:53:31,981 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir;  Ignoring.
2019-03-01 19:53:31,986 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to iknow (auth:SIMPLE)
2019-03-01 19:53:32,451 INFO  org.apache.hadoop.yarn.client.AHSProxy                        - Connecting to Application History server at data-hadoop-112-16.bjrs.zybang.com/192.168.112.16:10200
2019-03-01 19:53:32,479 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument n is deprecated in will be ignored.
2019-03-01 19:53:32,542 INFO  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  - Looking for the active RM in [rm1, rm2]...
2019-03-01 19:53:32,662 INFO  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  - Found active RM [rm1]
2019-03-01 19:53:32,720 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The JobManager or TaskManager memory is below the smallest possible YARN Container size. The value of 'yarn.scheduler.minimum-allocation-mb' is '2048'. Please increase the memory size.YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances you requested will start.
2019-03-01 19:53:32,720 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=2096, numberTaskManagers=2, slotsPerTaskManager=1}
2019-03-01 19:53:33,265 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-03-01 19:53:33,274 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/home/iknow/wangbin/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-03-01 19:53:42,592 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1550579025929_89782
2019-03-01 19:53:42,849 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1550579025929_89782
2019-03-01 19:53:42,849 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-03-01 19:53:42,853 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-03-01 19:53:50,718 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
2019-03-01 19:53:51,320 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
Flink JobManager is now running on search-as-107-45.bjcq.zybang.com:42915 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://search-as-107-45.bjcq.zybang.com:42915

yarn上是有这个yarn session的

《Flink on yarn运行python程序》

点击ApplicationMaster进去JobManager的web ui,启动一个python程序
./bin/pyflink.sh Adder.py程序起动到了ID为application_1550579025929_89782 的APPID上

《Flink on yarn运行python程序》

JobID为5612d8a9b68a72633a7e2138df4537ba。

再启动一个yarn session
./bin/yarn-session.sh -qu emergency -nm yarn-session-emergency -n 4 -jm 1024m -tm 2096m

《Flink on yarn运行python程序》

在yarn上看到有ID为application_1550579025929_89810的yarn session

《Flink on yarn运行python程序》

再用./bin/pyflink.sh脚本运行python程序

《Flink on yarn运行python程序》

查看JobManager 的web ui上看到是有899b20e2cde7b4ef29afc5b90e56325d的Job ID的。

《Flink on yarn运行python程序》

也就是./bin/pyflink.sh脚本会将python程序运行在最新启动的yarn session上。

再启动一个yarn session

./bin/yarn-session.sh -qu emergency -nm yarn-session-emergency-2 -n 2 -jm 2048m -tm 4096m

Yarn上看到有ID为application_1550579025929_89846的yarn session

《Flink on yarn运行python程序》

./bin/pyflink.sh脚本运行python脚本

《Flink on yarn运行python程序》

到JobManager上看到有jobID为9d7d9d58ee886fd74a7ec56ff487b80d的任务。

《Flink on yarn运行python程序》

Adder.py

from flink.plan.Environment import get_environment
from flink.functions.GroupReduceFunction import GroupReduceFunction

class Adder(GroupReduceFunction):
  def reduce(self, iterator, collector):
    count, word = iterator.next()
    count += sum([x[0] for x in iterator])
    collector.collect((count, word))

env = get_environment()
data = env.from_elements("Who's there?",
 "I think I hear them. Stand, ho! Who's there?")

data \
  .flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
  .group_by(1) \
  .reduce_group(Adder(), combinable=True) \
  .output()

env.execute()

flink官网python的Datasource和DataSink

从flink官网看到python的api支持的数据源(Data Source)如下:

  • 基于文件的:

    • read_text(path):按行读取文件并将其作为字符串返回。

    • read_csv(path, type):解析逗号(或其他字符)分隔字段的文件。返回元组的DataSet。支持基本java类型及其Value对应作为字段类型。

  • 基于集合:

    • from_elements(*args):从Seq创建数据集。所有数据元

    • generate_sequence(from, to):并行生成给定间隔中的数字序列。

支持的Data Sink

  • write_text():按字符串顺序写入数据元。通过调用每个数据元的str()方法获得字符串。

  • write_csv(…):将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的str()方法。

  • output():打印标准输出上每个数据元的str()值。

总结

Flink on yarn运行python程序的步骤:

    1. 开启一个yarn session
    1. 用 ./bin/pyflink.sh脚本运行python程序,程序会跑在最新启动的yarn session上。
  • 遇到的问题,如果启动多个yarn session,无法通过指定yid来运行到不同yarn session上。

参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/python.html

    原文作者:it_zzy
    原文地址: https://www.jianshu.com/p/e57e0f1a440f
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞