spark-streaming编程

1 Spark-streaming

1.1Spark-streaming 的KafkaManager类

1.1.1KafkaManager主要封装了两个方法

程序启动的时候获取consumer-group在zk保存的offset以及当前topic拥有的最大以及最小offset,获取正确的offset,代码如下

private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
    topics.foreach(topic => {
      var hasConsumed = true
      val partitionsE = kc.getPartitions(Set(topic))
      if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get.mkString("\n")}")
      val partitions: Set[TopicAndPartition] = partitionsE.right.get
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      if (consumerOffsetsE.isLeft) hasConsumed = false
      log.info("consumerOffsetsE.isLeft: " + consumerOffsetsE.isLeft)
      if (hasConsumed) {
        // 消费过
        log.info("消费过")
        /**
          * 如果zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
          * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
          * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
          * 这时把consumerOffsets更新为earliestLeaderOffsets
          */
        val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
        if (earliestLeaderOffsetsE.isLeft) throw new SparkException(s"get earliest offsets failed: ${earliestLeaderOffsetsE.left.get.mkString("\n")}")
        val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
        val consumerOffsets = consumerOffsetsE.right.get

        // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
        var offsets: Map[TopicAndPartition, Long] = Map()
        consumerOffsets.foreach({
          case (tp, n) =>
            val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
            if (n < earliestLeaderOffset) {
              log.info("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
                " offsets已经过时,更新为" + earliestLeaderOffset)
              offsets += (tp -> earliestLeaderOffset)
            }
        })
        log.info("offsets: " + consumerOffsets)
        if (!offsets.isEmpty) {
          kc.setConsumerOffsets(groupId, offsets)
        }
      } else {
        // 没有消费过
        log.info("没消费过")
        val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
        var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
        if (reset == Some("smallest")) {
          leaderOffsets = kc.getEarliestLeaderOffsets(partitions).right.get
        } else {
          leaderOffsets = kc.getLatestLeaderOffsets(partitions).right.get
        }
        val offsets = leaderOffsets.map {
          case (tp, offset) => (tp, offset.offset)
        }
        log.info("offsets: " + offsets)
        kc.setConsumerOffsets(groupId, offsets)
      }
    })
  }

当处理完DStream后,手动更新zk上的offset,如果程序出异常,则不更新offset,做到at-least-once,代码如下

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
    /**
      * 在这更新 currentOffsets 从而做到自适应上游 partition 数目变化
      */
    updateCurrentOffsetForKafkaPartitionChange()
    super.compute(validTime)
  }

  def updateZKOffsets(rdd: RDD[(String,String)]): Unit = {
    val groupId = kafkaParams.get("group.id").get
    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    for (offsets <- offsetsList) {
      val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
      val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
      log.info(s"${DateTimeUtils.getStringDateTime()}  @@@@@@ topic  ${offsets.topic}  partition ${offsets.partition}  fromoffset ${offsets.fromOffset}  untiloffset ${offsets.untilOffset}" +
        s"  diff ${{offsets.fromOffset} - {offsets.untilOffset} } #######")
      if (o.isLeft) {
        log.error(s"Error updating the offset to Kafka cluster: ${o.left.get}")
      }
    }
  }

1.2 Spark-streaming 的自适应partition变化类

增加AdaptiveUpStreamDirectKafkaInputDStream自适应partition增加功能类,比如topic增加partition,spark-streaming自适应partition变化,关键代码如下

private def updateCurrentOffsetForKafkaPartitionChange(): Unit = {
    val topic = currentOffsets.head._1.topic
    val nextPartitions: Int = getTopicMeta(topic) match {
      case Some(x) => x.partitionsMetadata.size()
      case _ => 0
    }
    val currPartitions = currentOffsets.keySet.size

    if (nextPartitions > currPartitions
    ) {
      var i = currPartitions
      while (i < nextPartitions
      ) {
        currentOffsets = currentOffsets + (TopicAndPartition(topic, i) -> 0
          )
        i = i + 1
      }
    }
    logInfo(s"######### ${
      nextPartitions
    }  currentParttions ${
      currentOffsets.keySet.size
    } ########")
  }

  private def getTopicMeta(topic: String): Option[TopicMetadata] = {
    var metaData: Option[TopicMetadata] = None
    var consumer: Option[SimpleConsumer] = None

    val topics = List[String](topic)
    val brokerList = MTkafkaParams.get("metadata.broker.list").get.split(",")
    //val brokerList = kafkaBrokerList.split(",")

    brokerList.foreach(
      item => {
        val hostPort = item.split(":")
        try {
          breakable {
            for (i <- 0 to 3) {
              consumer = Some(new SimpleConsumer(host = hostPort(0), port = hostPort(1).toInt,
                soTimeout = 10000, bufferSize = 64 * 1024, clientId = "leaderLookup"))
              val req: TopicMetadataRequest = new TopicMetadataRequest(topics.asJava)
              val resp = consumer.get.send(req)

              metaData = Some(resp.topicsMetadata.get(0))
              if (metaData.get.errorCode == ErrorMapping.NoError) break()
            }
          }
        } catch {
          case e => ;
            logInfo(s" ###### Error in AdaptiveUpStreamDirectKafkaInputDStream ${
              e
            } ######")
        }
      }
    )
    metaData
  }

1.3Spark-streaming 的编程模板代码

object TestMain {
  @transient lazy val LOG: Logger = LoggerFactory.getLogger(this.getClass);
  def main(args: Array[String]): Unit = {
    var conf_file = ""
    if (args.length > 0) {
      conf_file = args(0)
    }
    //init conf_file
    SparkUtils.initialize(conf_file)

    //init ssc
    val sparkConf = SparkUtils.buildSparkConf()
    val ssc = new StreamingContext(sparkConf, Seconds(SparkUtils.getValueByKey(Constants.SPARK_APP_DURATION).toLong))

    //init kafka params
    val kafkaParams = SparkUtils.buildkafkaParams()
    val topics = SparkUtils.getValueByKey(Constants.KAFKA_TOPICS)

    //init DStream
    val km = new KafkaManager(kafkaParams)
    val kafkaDirectStream = km.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics.split(",").toSet)
    LOG.info("======Initial Done======")
    kafkaDirectStream.cache()

    // init hbase config to broadcast to cluster executors
    val hbaseConfig = ssc.sparkContext.broadcast(SparkUtils.buildHbaseProps())
    //transform
    val processedDStream = kafkaDirectStream.map(_._2)
    //to do something

    //action
    processedDStream.foreachRDD { rdd =>
      //LOG.info("### rdd count = " + rdd.count() + "### rdd partition count:" + rdd.partitions.length)
      rdd.foreachPartition { partitionOfRecords =>

        //to do something

      }
    }

    //update zk offsets
    kafkaDirectStream.foreachRDD(rdd => {
      if (!rdd.isEmpty)
        km.updateZKOffsets(rdd)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

1.4Spark-streaming 的工具类

jedis pool

/**
  * Created by lancerlin on 2018/1/3.
  */
class MyRedisConnectionPool(createMyRedisConnnectionPoolFunc: () => RedisConnectionPool) extends Serializable {
  @transient lazy val LOG: Logger = LoggerFactory.getLogger(this.getClass)

  lazy val pool = createMyRedisConnnectionPoolFunc()
}


object MyRedisConnectionPool {
  lazy val config = {
    val cf = new PoolConfig
    cf.setMaxTotal(100)
    cf.setMaxIdle(5)
    cf.setMaxWaitMillis(1000)
    cf.setTestOnBorrow(true)
    cf
  }

  def apply[K, V](props: java.util.Properties): MyRedisConnectionPool = {
    val createMyRedisConnnectionPoolFunc = () => {
      val pool = new RedisConnectionPool(config, props)

      sys.addShutdownHook {
        pool.close()
      }
      pool
    }
    new MyRedisConnectionPool(createMyRedisConnnectionPoolFunc)
  }
}

streaming代码中的用法

    //broadcast redis info
    val redisConfig = ssc.sparkContext.broadcast(SparkUtils.buildRedisProps())
    .....
    dstream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecords => {
        val pool = MyRedisConnectionPool(redisConfig.value).pool
        val proxy = pool.getConnection
        while (partitionRecords.hasNext) {
         proxy.doMethod()
        }
        pool.returnConnection(proxy)
      })
    })

同理kafkaproducer

class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()


  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))

  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

}


object MySparkKafkaProducer {
  import scala.collection.JavaConversions._

  def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }

      producer
    }
    new MySparkKafkaProducer(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
}

同理mysqlUitls

/**
  * Created by lancerlin on 2017/12/28. 
  */
class MysqlPool(createMysqlPoolFunc:() => JdbcConnectionPool) extends Serializable {
  println("init ....")
  lazy val pool = createMysqlPoolFunc()
}

object MysqlPool {

  lazy val config = {
    val cf = new PoolConfig
    cf.setMaxTotal(100)
    cf.setMaxIdle(5)
    cf.setMaxWaitMillis(1000)
    cf.setTestOnBorrow(true)
    cf
  }

  def apply[K, V](props: java.util.Properties): MysqlPool = {
    val createMysqlPoolFunc = () => {
      val pool = new JdbcConnectionPool(config, props)

      sys.addShutdownHook {
        pool.close()
      }
      pool
    }
    new MysqlPool(createMysqlPoolFunc)
  }
}

1.5 Spark-streaming 中如何更新broadcast

import java.io.{ObjectInputStream, ObjectOutputStream}
import java.util.Calendar

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import org.slf4j.{Logger, LoggerFactory}

import scala.reflect.ClassTag

/**
  * Created by lancerlin on 2018/2/1.
  *
  * 更新broadcast工具类,使用方式
  * *
  * val value1 = MyBroadcastWrapper(ssc, new Date().toString)
  * *
  *processedDStream.foreachRDD { rdd =>
  * *
  * value1.update(new Date().toString, true)
  * *
  *rdd.foreachPartition {
  * partitionOfRecords =>
  * while (partitionOfRecords.hasNext){
  * val str = partitionOfRecords.next()
  * }
  *LOG.warn(s"borad cast value @@@@ [{${value1.value}}])")
  * }
  * }
  *
  */

// This wrapper lets us update brodcast variables within DStreams' foreachRDD
// without running into serialization issues
case class MyBroadcastWrapper[T: ClassTag](
                                            @transient private val ssc: StreamingContext,
                                            @transient private val _v: T
                                          ) {
  var lastUpdatedAt = Calendar.getInstance.getTime

  @transient lazy val LOG: Logger = LoggerFactory.getLogger(this.getClass);
  @transient private var v = ssc.sparkContext.broadcast(_v)

  def update(newValue: T, blocking: Boolean = false, updateInterval: Integer): Unit = {
    val currentDate = Calendar.getInstance.getTime
    val diff = currentDate.getTime - lastUpdatedAt.getTime

    //默认60秒更新,这个时间跟进需求修改
    var interval = 60000
    if (updateInterval != null && updateInterval >= 0) {
      interval = updateInterval
    }
    if (v == null || diff > interval) {
      lastUpdatedAt = Calendar.getInstance.getTime

      if (v != null) {
        v.unpersist(blocking)
      }
      v = ssc.sparkContext.broadcast(newValue)
    }
  }

  def value: T = v.value

  private def writeObject(out: ObjectOutputStream): Unit = {
    out.writeObject(v)
  }

  private def readObject(in: ObjectInputStream): Unit = {
    v = in.readObject().asInstanceOf[Broadcast[T]]
  }
}

用法

val value = MyBroadcastWrapper(ssc, new Date().toString)

  processedDStream.foreachRDD { rdd =>

     //默认一分钟更新一次,传入120000,修改为两分钟更新一次
      value1.update(new Date().toString, true,120000)

      rdd.foreachPartition {
        partitionOfRecords =>
          while (partitionOfRecords.hasNext){
            val str = partitionOfRecords.next()
          }
          LOG.warn(s"borad cast value @@@@ [{${value.value}}])")
      }
    }

测试结果,10秒一个批,两分钟更新日志的值LOG.warn(s"borad cast value @@@@ [{${value.value}}])")

《spark-streaming编程》 image.png

参考文档:
https://stackoverflow.com/questions/33372264/how-can-i-update-a-broadcast-variable-in-spark-streaming
https://www.jianshu.com/p/5dbb102cbbd9

    原文作者:LancerLin_LX
    原文地址: https://www.jianshu.com/p/4da904644851
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞