Spark 框架安全认证实现

导言

随着大数据集群的使用,大数据的安全受到越来越多的关注一个安全的大数据集群的使用,运维必普通的集群更为复杂。
集群的安全通常基于kerberos集群完成安全认证。kerberos基本原理可参考:一张图了解Kerberos访问流程

Spark应用(On Yarn模式下)在安全的hadoop集群下的访问,需要访问各种各样的组件/进程,如ResourceManager,NodeManager,NameNode,DataNode,Kafka,Hmaster,HregionServer,MetaStore等等。尤其是在长时运行的应用,如sparkStreaming,StructedStreaming,如何保证用户认证后的长期有效性,其安全/认证更为复杂。

一个Spark应用提交用户要先在kdc中完成用户的认证,及拿到对应service服务的票据之后才能访问对应的服务。由于Spark应用运行时涉及yarnclient,driver,applicationMaster,executor等多个服务,这其中每个进程都应当是同一个用户启动并运行,这就涉及到多个进程中使用同一个用户的票据来对各种服务进行访问,本文基于Spark2.3对此做简要分析。

  • spark应用包含进程
进程功能yarn-client模式yarn-cluster模式
yarnclientSpark应用提交app的模块yarn-client模式下生命周期与driver一致;yarn-cluster模式下可以设置为app提交后即退出,或者提交后一直监控app运行状态
driverspark应用驱动器,调度应用逻辑,应用的“大脑”yarn-client模式下运行在yarnclient的JVM中;yarn-cluster模式下运行在applicationMaster中
applicationMaster基于yarn服务抽象出的app管理者yarn-client模式下仅仅负责启动/监控container,汇报应用状态的功能;yarn-cluster模式下负责启动/监控container,汇报应用状态的功,同时包含driver功能
Executorspark应用的执行器,yarn应用的container实体,业务逻辑的实际执行者

spark应用的提交用户认证之后才能提交应用,所以在yarnclient/driver的逻辑中必然会执行到kerberos认证相关的登录认证。然而其他的进程如applicationMaster,executor等均需要经过认证,应用提交后才由用户启动,这些进程则可以不进行kerberos认证而是利用Hadoop的token机制完成认证,减小kerberos服务压力,同时提高访问效率。

  • Hadoop Token机制

Hadoop的token实现基类为org.apache.hadoop.security.token.Token,

/**
   * Construct a token from the components.
   * @param identifier the token identifier
   * @param password the token's password
   * @param kind the kind of token
   * @param service the service for this token
   */
  public Token(byte[] identifier, byte[] password, Text kind, Text service) {
    this.identifier = identifier;
    this.password = password;
    this.kind = kind;
    this.service = service;
  }

不同的服务也可hadoop的token来交互,只要使用不同的identifer来区分token即可。 如NMTokenIdentifier, AMRMTokenIdentifier,AuthenticationTokenIdentifier等不同的tokenIdentifier来区分不同的服务类型的token。

Spark应用各进程的安全实现

yarnclient的实现

此处yarnclient指的是向ResourceManager提交yarn应用的客户端。在spark中,向yarn提交应用有两种应用有yarn-client,yarn-cluster模式。在这两种应用模式下提交应用,yarn client逻辑有些许不同。

安全hadoop场景下spark的用户登录认证机制

  • spark提交应用时,通过–principal, –keytab参数传入认证所需文件。
    在sparkSubmit中prepareSubmitEnvironment时,完成认证

     // assure a keytab is available from any place in a JVM
     if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {
       if (args.principal != null) {
         if (args.keytab != null) {
           require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
           // Add keytab and principal configurations in sysProps to make them available
           // for later use; e.g. in spark sql, the isolated class loader used to talk
           // to HiveMetastore will use these settings. They will be set as Java system
           // properties and then loaded by SparkConf
           sparkConf.set(KEYTAB, args.keytab)
           sparkConf.set(PRINCIPAL, args.principal)
           UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
         }
       }
     }
    
  • 在yarn-cluster模式下,不会调用业务层代码,即不会初始化SparkContext,其通过YarnClusterApplication的start方法调用client.submitApplication提交应用

  • 在yarn-client模式下,会在yarnclient逻辑中调用业务代码,即会初始化并运行SparkContext,通过YarnClientSchedulerBackend其调度client.submitApplication提交应用。

在client的submitApplication方法中提交app,之后创建amContext,准备本地资源,此时会将本地的文件上传至HDFS,其中就包括keytab文件,同时会生成spark_conf.properties配置文件以供am使用,该配置文件中会包含keytab的配置

 val props = new Properties()
  sparkConf.getAll.foreach { case (k, v) =>
    props.setProperty(k, v)
  }
  // Override spark.yarn.key to point to the location in distributed cache which will be used
  // by AM.
  Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k) }

其中的amKeytabFileName是在setUpCredentials时设置如下,该值为指定的keytab文件加上随机的字符串后缀,骑在am重点使用,可参考下节的介绍。

val f = new File(keytab)
  // Generate a file name that can be used for the keytab file, that does not conflict
  // with any user file.
  amKeytabFileName = f.getName + "-" + UUID.randomUUID().toString
  sparkConf.set(PRINCIPAL.key, principal)

获取相关组件的token,注意:此处的token均非与yarn服务交互相关token,这里只有与HDFS,HBASE,Hive服务交互的token。

 def obtainDelegationTokens(
  hadoopConf: Configuration,
  creds: Credentials): Long = {
delegationTokenProviders.values.flatMap { provider =>
  if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
  // 各provider的obtainDelegationTokens方法中,会获取对应组件的token,并放入credentials中
    provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
  } else {
    logDebug(s"Service ${provider.serviceName} does not require a token." +
      s" Check your configuration to see if security is disabled or not.")
    None
  }
}.foldLeft(Long.MaxValue)(math.min)

}

Spark中常访问的服务使用token机制的有hive,hbase,hdfs,对应的tokenProvider如下:

服务tokenProvidertoken获取类token获取方法
HDFSHadoopFSDelegationTokenProviderorg.apache.hadoop.hbase.security.token.TokenUtilobtainToken
HIVEHiveDelegationTokenProviderorg.apache.hadoop.hive.ql.metadatagetDelegationToken
HBASEHBaseDelegationTokenProviderorg.apache.hadoop.hdfs.DistributedFileSystemaddDelegationTokens

以HbaseDelegationTokenProvider为例,主要是通过反射调用hbase的TokenUtil类的obtainTOken方法,对应的obtainDelegationTokens方法如下:

override def obtainDelegationTokens(
  hadoopConf: Configuration,
  sparkConf: SparkConf,
  creds: Credentials): Option[Long] = {
try {
  val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
  val obtainToken = mirror.classLoader.
    loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
    getMethod("obtainToken", classOf[Configuration])
  logDebug("Attempting to fetch HBase security token.")
  val token = obtainToken.invoke(null, hbaseConf(hadoopConf))
    .asInstanceOf[Token[_ <: TokenIdentifier]]
  logInfo(s"Get token from HBase: ${token.toString}")
  creds.addToken(token.getService, token)
} catch {
  case NonFatal(e) =>
    logDebug(s"Failed to get token from service $serviceName", e)
}
None
}

PS : HBase的token获取的用户需要具有hbase:meta表的exec权限,否则无法成功获取token

在获取token后,将token设置到amContainer中,并放入appContext中

private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
val dob = new DataOutputBuffer
credentials.writeTokenStorageToStream(dob)
amContainer.setTokens(ByteBuffer.wrap(dob.getData))
}
//
appContext.setAMContainerSpec(containerContext)

driver的token更新

在yarn-client模式下,driver在yarnclient进程中启动,同样需要访问业务层及集群的相关组件如hdfs。driver通过读取am更新在hdfs路径下的credentials文件来保证driver节点的token有效。

// SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver
// reads the credentials from HDFS, just like the executors and updates its own credentials
// cache.
if (conf.contains("spark.yarn.credentials.file")) {
    YarnSparkHadoopUtil.startCredentialUpdater(conf)
} 

在yarn-cluster模式下,driver运行在applicationMaster的JVM中,其安全相关由Am同一操作

ApplicationMaster 的安全认证

applicationMaster是Yarn进行应用调度/管理的核心,需要与RM/NM等进行交互以便应用运行。其中相关的交互均通过token完成认证,认证实现由Yarn内部框架完成。查看am日志发现,即是在非安全(非kerberos)的场景下,同样会使用到token。而与hdfs,hbase等服务交互使用的token则需Spark框架来实现。

applicationMaster中与YARN相关的认证

  • AM与RM的认证

在ResourceManager接收到应用提交的ApplicationSubmissionContext后,在其AmLauncher.java的run方法中为am设置生成“YARN_AM_RM_TOKEN,该token用于am于rm通信使用”

 public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
  ApplicationAttemptId appAttemptId) {
this.writeLock.lock();
try {
  LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
  AMRMTokenIdentifier identifier =
      new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey()
        .getKeyId());
  byte[] password = this.createPassword(identifier);
  appAttemptSet.add(appAttemptId);
  return new Token<AMRMTokenIdentifier>(identifier.getBytes(), password,
    identifier.getKind(), new Text());
} finally {
  this.writeLock.unlock();
}
}
  • AM与NM的认证

Am在启动之后,会向ResourceManager申请container,并与对应的NodeManager通信以启动container。然而AM与NM通信的token是如何得到的呢?

查看AMRMClientImpl类可以看到,AM向RM发送分配请求,RM接收到请求后,将container要分配至的NM节点的Token放置response中返回给AM。Am接收到response后,会保存NMToken,并判定是否需要更新YARN_AM_RM_TOKEN

//通过rmClient向RM发送分配请求
allocateResponse = rmClient.allocate(allocateRequest);
//拿到response后,保存NMToken并根据response判定是否需要更新AMRM通信的TOken
if (!allocateResponse.getNMTokens().isEmpty()) {
      populateNMTokens(allocateResponse.getNMTokens());
    }
    if (allocateResponse.getAMRMToken() != null) {
      updateAMRMToken(allocateResponse.getAMRMToken());
    }

RM通过ApplicationMasterService响应allocation请求

// 通过调度器为cotnainer分配NodeManager并生成该NodeManager的Token放入allcation中
 Allocation allocation =
      this.rScheduler.allocate(appAttemptId, ask, release, 
          blacklistAdditions, blacklistRemovals);
 ......
  if (!allocation.getContainers().isEmpty()) {
    allocateResponse.setNMTokens(allocation.getNMTokens());
  }

AM在准备启动container时,将当前用户的token都设置进ContainerLaunchContext中

def startContainer(): java.util.Map[String, ByteBuffer] = {
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
  .asInstanceOf[ContainerLaunchContext]
val env = prepareEnvironment().asJava
ctx.setLocalResources(localResources.asJava)
ctx.setEnvironment(env)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))

ApplicationMaster业务相关的服务的token更新

Am启动的资源情况

查看Am启动命令大致如下,可以发现有指定配置文件,而该配置文件即为yarnclient生成上传至hdfs,在am启动前由NodeManager从hdfs中copy至本地路径,供container使用:

 /usr/jdk64/jdk1.8.0_77//bin/java -server -Xmx512m -Djava.io.tmpdir=/localpath/*/tmp -Dspark.yarn.app.container.log.dir=/localpath/*/ org.apache.spark.deploy.yarn.ExecutorLauncher --arg host:port --properties-file /localpath/*/__spark_conf__/__spark_conf__.properties

查看此配置文件可以看到有如下配置项:

spark.yarn.principal=ocsp-ygcluster@ASIAINFO.COM
spark.yarn.keytab=hbase.headless.keytab-18f29b79-b7a6-4cb2-b79d-4305432a5e9a

下图为am进程使用到的资源文件

《Spark 框架安全认证实现》 am进程资源.jpg

如上可以看出,am虽然运行在集群中,但运行时认证相关的资源已经准备就绪。下面分析其运行中关于安全的逻辑

Am安全认证及token更新逻辑

在applicationMaster中,定期更新token,并写入文件到hdfs的相关目录,并清理旧文件以供各executor使用。
在ApplicationMaster启动后,进行login登录并启动名为am-kerberos-renewer的dameon线程定期登录,保证用户认证的有效性

private val ugi = {
val original = UserGroupInformation.getCurrentUser()

// If a principal and keytab were provided, log in to kerberos, and set up a thread to
// renew the kerberos ticket when needed. Because the UGI API does not expose the TTL
// of the TGT, use a configuration to define how often to check that a relogin is necessary.
// checkTGTAndReloginFromKeytab() is a no-op if the relogin is not yet needed.
val principal = sparkConf.get(PRINCIPAL).orNull
val keytab = sparkConf.get(KEYTAB).orNull
if (principal != null && keytab != null) {
  UserGroupInformation.loginUserFromKeytab(principal, keytab)

  val renewer = new Thread() {
    override def run(): Unit = Utils.tryLogNonFatalError {
      while (true) {
        TimeUnit.SECONDS.sleep(sparkConf.get(KERBEROS_RELOGIN_PERIOD))
        UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab()
      }
    }
  }
  renewer.setName("am-kerberos-renewer")
  renewer.setDaemon(true)
  renewer.start()

  // Transfer the original user's tokens to the new user, since that's needed to connect to
  // YARN. It also copies over any delegation tokens that might have been created by the
  // client, which will then be transferred over when starting executors (until new ones
  // are created by the periodic task).
  val newUser = UserGroupInformation.getCurrentUser()
  SparkHadoopUtil.get.transferCredentials(original, newUser)
  newUser
} else {
  SparkHadoopUtil.get.createSparkUser()
}
}

在am中启动AMCredentialRenewerStarter线程,调度认证登录及token renew逻辑

if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {
        val credentialRenewerThread = new Thread {
          setName("AMCredentialRenewerStarter")
          setContextClassLoader(userClassLoader)
      override def run(): Unit = {
        val credentialManager = new YARNHadoopDelegationTokenManager(
          sparkConf,
          yarnConf,
          conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
        val credentialRenewer =
          new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
        credentialRenewer.scheduleLoginFromKeytab()
      }
    }
    credentialRenewerThread.start()
    credentialRenewerThread.join()
  }

在scheduleLoginFromKeytab中,会周期调度登录,token获取更新写入hdfs文件等操作。
其核心逻辑如下

调度周期:

各种组件的token更新周期如hdfs的更新周期dfs.namenode.delegation.token.renew-interval默认为1天,hbase的token更新周期hbase.auth.key.update.interval默认为1天;调度更新的周期为如上各组件最小值的75%,

调度流程:

//将生成的token写入hdfs目录${spark.yarn.credentials.file}-${timeStamp}-${nextSuffix}
writeNewCredentialsToHDFS(principal, keytab)
//删除逻辑为保留五个(${spark.yarn.credentials.file.retention.count})文件,文件更新时间早于五天(${spark.yarn.credentials.file.retention.days})的全部清理
cleanupOldFiles()

Executor的认证机制

executor的认证同样使用的是token机制。executor启动之后,根据driver启动设置的${spark.yarn.credentials.file}启动token更新:

if (driverConf.contains("spark.yarn.credentials.file")) {
    logInfo("Will periodically update credentials from: " +
      driverConf.get("spark.yarn.credentials.file"))
    Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
      .getMethod("startCredentialUpdater", classOf[SparkConf])
      .invoke(null, driverConf)
  }

Executor中的token更新是读取hdfs目录《Spark 框架安全认证实现》{timeStamp}-${nextSuffix}目录下的文件,读取到缓存中,以便保证读取到的是更新后的token使用。

安全Spark的使用

Spark框架完成的kerberos认证及使用token与其他服务交互的机制使用较为简单,只需要在提交应用时的spark-submit命令行中加入–principal appuserName –keytab /path/to/user.keytab即可

    原文作者:WestC
    原文地址: https://www.jianshu.com/p/7fe5351399a8
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞