配置
Configuration | Default Value | Meaning |
---|---|---|
spark.driver.cores | 1 | Number of cores to use for the driver process, only in cluster mode. cluster模式下driver进程的核数,因为改模式下am和driver实为一体,故也是am的的核数。 |
spark.yarn.am.cores | 1 | Number of cores to use for the YARN Application Master in client mode.In cluster mode, use spark.driver.cores instead. yarn client模式下,am的核数,driver和am在该模式下是分离的,故此时是没有driver核数这样一个概念的 |
源码
private val amCores = if (isClusterMode) {
sparkConf.get(DRIVER_CORES)
} else {
sparkConf.get(AM_CORES)
}
具体可以参见yarn.Client.scala#L87,这边用去读取参数设置ApplicationMaster的对应核数。
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(amMemory + amMemoryOverhead)
capability.setVirtualCores(amCores)
sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
case Some(expr) =>
try {
val amRequest = Records.newRecord(classOf[ResourceRequest])
amRequest.setResourceName(ResourceRequest.ANY)
amRequest.setPriority(Priority.newInstance(0))
amRequest.setCapability(capability)
...
具体可以参见yarn.Client.scala#L251,这边被封装进am请求消息,提交到Yarn,一系列操作后最终交给NodeManager生成一个ApplicationMaster的Container,也即一个JVM进程。
分析
我们知道,我们在玩JVM的时候可以设置各种内存参数,比如Xmx, Xss等,也可以设置GC的线程数,比如-XX:ParallelGCThreads等,貌似没有直接设置JVM用多少Thread的参数,参见,
java -XX:+PrintFlagsInitial | grep "Thread" | grep -v "bool"
intx CompilerThreadPriority = -1 {product}
intx CompilerThreadStackSize = 0 {pd product}
uintx ConcGCThreads = 0 {product}
intx DefaultThreadPriority = -1 {product}
uintx G1ConcRefinementThreads = 0 {product}
uintx HeapSizePerGCThread = 87241520 {product}
uintx NewSizeThreadIncrease = 5320 {pd product}
uintx ParallelGCThreads = 0 {product}
intx ThreadPriorityPolicy = 0 {product}
uintx ThreadSafetyMargin = 52428800 {product}
intx ThreadStackSize = 1024 {pd product}
intx VMThreadPriority = -1 {product}
intx VMThreadStackSize = 1024 {pd product}
那我们设置这个核数有什么用处呢?提高ApplicationMaster的并发?BUT HOW?
对比
相比而言,spark.executor.cores
容易理解的多,该参数用以设置executor的核数,是一个简单的数值,比如我们设置为4,driver端根据executor们当前的空闲核数进行task的分配,分配了空闲核数就-1,收到task完成的消息就+1,空出来让driver在分配task过来,如此而已。
但看ApplicationMaster的源码,亦没有诸如此类的受控操作。
一个错误
18/02/04 06:27:52 ERROR yarn.ApplicationMaster: Exception from Reporter thread.
org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException: Application attempt appattempt_1515478669260_917050_000001 doesn't exist in ApplicationMasterService cache.
at org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:439)
at org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60)
at org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
at org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:101)
at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)
at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy24.allocate(Unknown Source)
at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:277)
at org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:260)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:458)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException): Application attempt appattempt_1515478669260_917050_000001 doesn't exist in ApplicationMasterService cache.
at org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:439)
at org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60)
at org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy23.allocate(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
... 9 more
18/02/04 06:27:52 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 12, (reason: Application attempt appattempt_1515478669260_917050_000001 doesn't exist in ApplicationMasterService cache.
at org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:439)
at org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60)
at org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
)
18/02/04 06:27:52 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
错误大意是ApplicationMaster作为客户端向ResourceManager请求分配Container的时候,发现自己已经不被ResourceManager认识,得失忆症了?如果ApplicationMaster没有向ResourceManager unregister自个儿,那必然是时间太长没有联系,断绝了关系。
看下ApplicationMaster的源码
private def launchReporterThread(): Thread = {
// The number of failures in a row until Reporter thread give up
val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
val t = new Thread {
override def run() {
var failureCount = 0
while (!finished) {
try {
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
s"Max number of executor failures ($maxNumExecutorFailures) reached")
} else {
logDebug("Sending progress")
allocator.allocateResources()
}
failureCount = 0
} catch {
case i: InterruptedException =>
case e: Throwable =>
failureCount += 1
// this exception was introduced in hadoop 2.4 and this code would not compile
// with earlier versions if we refer it directly.
if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException" ==
e.getClass().getName()) {
logError("Exception from Reporter thread.", e)
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
e.getMessage)
} else if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"$failureCount time(s) from Reporter thread.")
} else {
logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
}
}
.....
代码很简单,ApplicationMaster这边会起一个线程,没有异常的情况下,会周期性的(spark.yarn.scheduler.heartbeat.interval-ms 3秒)执行allocator.allocateResources()
, 如果有executor挂了不足我们所申请的数目,就补申请几个,没有缺的情况下还继续执行这个代码,那估计就相当于ApplicationMaster与ResourceManager之间的心跳了。
讲到这里我们在回过头来看那两个参数就有点眉目了,java的线程数虽然不直接等价于CPU的线程数,但本质上还是参与了cpu时间片的竞争而已。在Yarn这一层对核数这个理解就比较简单了,我们先获取CPU个数*单个CPU的逻辑核数得到某台机器总线程数,然后NodeManager就根据这个数值来完成Container的分配,包括我们所说的ApplicationMaster及Spark的Executor,分配一个就减去这个Container的核数,单个Container的核数越大,参与CPU竞争的总Container个数就越小。对于cluster模式而言,Driver和ApplicationMaster独立存在于一个进程,线程比client模式下更加繁忙,遇到类似GC的问题,极有可能使我们与ResourceManager心跳线程无法获得执行机会,超过它认为的时间,am就失联了。所以我们设置spark.driver.cores
的意义就来了,我们增大这个数值,就有可能可以减少该计算机节点上分配其他Container,物理资源就不会那么吃紧,这种场景的发生概率也会相应降低。
总结
spark.driver.cores
在生产环境下在合理的范围内可以适当调整的大些,应该是有助于其处理能力的提升的,通过挤兑别人的方式。。不然我也想不出这两参数有啥用了。。
后记
咨询了下组里yarn的同事,说启用的cgroup之后应该这些参数的作用应该就更加明显和明确了。