Kafka源码分析-启动流程

  • 我们一般都是使用bin/kafka-server-start.sh脚本来启动;
  • bin/kafka-server-start.sh可以知道此脚本用法: echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" (1) server.properties为配置文件路径, 这里config/server.properties有一个配置文件的模板,里面就是一行行的key=value; (2) --override property=value是若干个可项的参数, 用来覆盖server.properties配置文件中同名的配置项;
  • bin/kafka-server-start.sh 最后一行exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"可知, Kafka启动时的入口类为kafka.Kafka, 我们直接来看这个类;

Kafka启动入口类:kafk.Kafak

  • 所在文件: core/src/main/scala/kafka/Kafka.scala
  • 定义: object Kafka extends Logging
  • main函数:
val serverProps = getPropsFromArgs(args)
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

      // attach shutdown handler to catch control-c
      Runtime.getRuntime().addShutdownHook(new Thread() {
        override def run() = {
          kafkaServerStartable.shutdown //捕获control-c中断,停止当前服务
        }
      })

      kafkaServerStartable.startup //启动服务
      kafkaServerStartable.awaitShutdown //等待服务结束

使用getPropsFromArgs方法来获取各配置项, 然后将启动和停止动作全部代理给KafkaServerStartable类;

Kafka启动代理类:KafkaServerStartable

  • 伴生对象: object KafkaServerStartable 提供fromProps方法来创建 KafkaServerStartable;
  • KafkaServerStartable对象创建时会同时创建 KafkaServer, 这才是真正的主角;
def startup() {
    try {
      server.startup()
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
        // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
        System.exit(1)
    }
  }

  def shutdown() {
    try {
      server.shutdown()
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
        // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
        Runtime.getRuntime.halt(1)
    }
  }

  /**
   * Allow setting broker state from the startable.
   * This is needed when a custom kafka server startable want to emit new states that it introduces.
   */
  def setServerState(newState: Byte) {
    server.brokerState.newState(newState)
  }

  def awaitShutdown() = 
    server.awaitShutdown
    原文作者:扫帚的影子
    原文地址: https://zhuanlan.zhihu.com/p/49261103
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞