flink 1.1 和 storm 0.10 API 实例对比
本来标题想叫 flink 和 storm API 实例对比,发现它们最近 API 都变更频繁,
就加上了版本号,storm 在 1.0 版本后增加了 IWindowedBolt 接口,不过我还没有试用,
之后可能会补上.
需求说明
一份 Kafka 日志记录着用户在不同平台的访问时间,uuid,一条信息的例子如下
{
"platform": 1,
"time": 1470267515,
"uuid": "ad751bb3-d0ee-c9b9-be26-2ba4570bb3fe",
}
我们需要统计不同平台每天的uv情况,并且有如下要求.
每秒钟要输出最新的统计结果
程序永远跑着不会停,所以要定期清理内存里的过时数据
收到的消息里的时间字段并不是按照顺序严格递增的,所以要有一定的容错机制
访问uv并不一定每秒钟都会变化,重复输出对IO是巨大的浪费,所以要在uv变更时在一秒
内输出结果,未变更时不输出
Storm 方案
输入
输入数据使用KafkaSpout, Storm 有现成的 KafkaSpout, 加上配置文件即可使用.
val hosts = new ZkHosts("zkhosts")
val spoutConfig = new SpoutConfig(hosts, "sometopic", "zkroot", "xxx")
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme)
spoutConfig.startOffsetTime = OffsetRequest.LatestTime
val builder = new TopologyBuilder
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig))
计算结果
创建一个KeeperBolt,用Map和Set保存当前结果,一条日志对应的Set要通过日志时间对应
的日期和平台取得
class KeeperBolt extends BaseBasicBolt {
val map = mutable.Map.empty[LocalDate, mutable.Map[Int, mutable.Set[Int]]]
var lastSize = Map.empty[LocalDate, Map[Int, Int]]
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
input.getSourceStreamId match {
case "default" =>
val jsObj = Json.parse(input.getString(0))
val time = (jsObj \ "time").as[Int]
val date = Instant.ofEpochSecond(time).atZone(ZoneId.systemDefault()).toLocalDate
val platform = (jsObj \ "platform").as[Int]
val uuid = (jsObj \ "uuid").as[String]
val datemap = map.getOrElseUpdate(date, mutable.Map.empty)
val set = datemap.getOrElseUpdate(platform, mutable.Set.empty)
set.add(uuid.hashCode)
定期清理旧数据&输出当前结果
Storm处理这方面的内容比较麻烦,为了不破坏每个Storm Task单线程的特性,我们不应该直接
在Bolt里起一个Timer做这件事情,否则就要加上不必要的锁,一来影响性能和增加程序的复杂度,
二来自创线程也会影响storm对Topology运行状态/性能的评估,所以需要通过另外建立Spout的
方式发送消息让Bolt能够收到清空消息的作用.
class CommandSpout extends BaseRichSpout {
var collecor: SpoutOutputCollector = _
var commands = Vector.empty[String]
override def nextTuple(): Unit = {
commands.synchronized {
while (commands.nonEmpty) {
println("emit: " + commands.head)
collecor.emit("command", new Values(commands.head), Random.nextInt())
commands = commands.tail
}
}
}
override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
this.collecor = collector
val timer = new Timer
timer.scheduleAtFixedRate(new TimerTask {
override def run(): Unit = {
commands.synchronized {
commands +:= "output"
}
}
}, 1000, 1000)
timer.scheduleAtFixedRate(new TimerTask {
override def run(): Unit = {
commands.synchronized {
commands +:= "retain"
}
}
}, 86400, 86400)
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declareStream("command", new Fields("command"))
}
}
KeeperBolt收到消息以后,要执行相应的动作,包括输出和清理旧数据.
输出方面:
需要单独建立一个LastSize类维持之前输出时候的结果,将现有的map中每个uv的数量
和上次输出的uv比较,只输出发生变化的内容.
清理旧数据:
KeeperBolt增添了一个变量叫做beginDate,当beginDate每日更新以后,
一方面将map中日期早于beginDate的数据删除,
另一方面,之后接收消息的过程中会做过滤.
class KeeperBolt(var beginDate: LocalDate = LocalDate.now().minusDays(2)) extends BaseBasicBolt {
val map = mutable.Map.empty[LocalDate, mutable.Map[Int, mutable.Set[Int]]]
var lastSize = Map.empty[LocalDate, Map[Int, Int]]
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
input.getSourceStreamId match {
case "default" =>
val jsObj = Json.parse(input.getString(0))
val time = (jsObj \ "time").as[Int]
val date = Instant.ofEpochSecond(time).atZone(ZoneId.systemDefault()).toLocalDate
if (date.compareTo(beginDate) > 0) {
val platform = (jsObj \ "platform").as[Int]
val uuid = (jsObj \ "uuid").as[String]
val datemap = map.getOrElseUpdate(date, mutable.Map.empty)
val set = datemap.getOrElseUpdate(platform, mutable.Set.empty)
set.add(uuid.hashCode)
}
case "command" =>
val command = input.getString(0) match {
case "output" =>
val currSize: Map[LocalDate, Map[Int, Int]] = map.map {
case (key, submap) => (key, submap.map {
case (platform, set) => (platform, set.size)
}.toMap)
}.toMap
println("currSize = " + currSize)
println("lastSize = " + lastSize)
for ((date, subMap) <- currSize; (platform, uv) <- subMap) {
lastSize.get(date).flatMap(_.get(platform)) match {
case Some(lastuv) if lastuv == uv => //不做任何事
case _ =>
println("updated date,platform,uv = " + (date, platform, uv))
}
}
lastSize = currSize
case "retain" =>
beginDate = LocalDate.now().minusDays(2)
}
}
}
构建 Topology
每个节点工作完成以后,我们需要编码把节点连接起来.
总共用到两个Spout和一个Bolt,两个Spout分别负责输出日志和发送命令,
一个Bolt存储数据和订阅两个Spout
val builder = new TopologyBuilder
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig))
builder.setSpout("command-spout", new CommandSpout)
builder.setBolt("keeper", new KeeperBolt)
.allGrouping("command-spout", "command")
.localOrShuffleGrouping("kafka-spout")
val conf = new Config
conf.setNumWorkers(1)
val topo = builder.createTopology()
val cluster = new LocalCluster()
StormSubmitter.submitTopologyWithProgressBar("Boom", conf, topo)
这样一个计算每天不同平台uv输出到,定时清理旧数据,永远运行的应用就完成了.
完整代码
import java.time.{Instant, LocalDate, ZoneId}
import java.util
import java.util.{Timer, TimerTask}
import kafka.api.OffsetRequest
import org.apache.storm.kafka.{KafkaSpout, SpoutConfig, StringScheme, ZkHosts}
import org.apache.storm.spout.{SchemeAsMultiScheme, SpoutOutputCollector}
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.base.{BaseBasicBolt, BaseRichSpout}
import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer, TopologyBuilder}
import org.apache.storm.tuple.{Fields, Tuple, Values}
import org.apache.storm.{Config, LocalCluster, StormSubmitter}
import play.api.libs.json.Json
import scala.collection.mutable
import scala.util.Random
object StormMain {
class CommandSpout extends BaseRichSpout {
var collecor: SpoutOutputCollector = _
var commands = Vector.empty[String]
override def nextTuple(): Unit = {
commands.synchronized {
while (commands.nonEmpty) {
println("emit: " + commands.head)
collecor.emit("command", new Values(commands.head), Random.nextInt())
commands = commands.tail
}
}
}
override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
this.collecor = collector
val timer = new Timer
timer.scheduleAtFixedRate(new TimerTask {
override def run(): Unit = {
commands.synchronized {
commands +:= "output"
}
}
}, 1000, 1000)
timer.scheduleAtFixedRate(new TimerTask {
override def run(): Unit = {
commands.synchronized {
commands +:= "retain"
}
}
}, 86400, 86400)
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declareStream("command", new Fields("command"))
}
}
class KeeperBolt(var beginDate: LocalDate = LocalDate.now().minusDays(2)) extends BaseBasicBolt {
val map = mutable.Map.empty[LocalDate, mutable.Map[Int, mutable.Set[Int]]]
var lastSize = Map.empty[LocalDate, Map[Int, Int]]
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
input.getSourceStreamId match {
case "default" =>
val jsObj = Json.parse(input.getString(0))
val time = (jsObj \ "time").as[Int]
val date = Instant.ofEpochSecond(time).atZone(ZoneId.systemDefault()).toLocalDate
if (date.compareTo(beginDate) > 0) {
val platform = (jsObj \ "platform").as[Int]
val uuid = (jsObj \ "uuid").as[String]
val datemap = map.getOrElseUpdate(date, mutable.Map.empty)
val set = datemap.getOrElseUpdate(platform, mutable.Set.empty)
set.add(uuid.hashCode)
}
case "command" =>
val command = input.getString(0) match {
case "output" =>
val currSize: Map[LocalDate, Map[Int, Int]] = map.map {
case (key, submap) => (key, submap.map {
case (platform, set) => (platform, set.size)
}.toMap)
}.toMap
println("currSize = " + currSize)
println("lastSize = " + lastSize)
for ((date, subMap) <- currSize; (platform, uv) <- subMap) {
lastSize.get(date).flatMap(_.get(platform)) match {
case Some(lastuv) if lastuv == uv => //不做任何事
case _ =>
println("updated date,platform,uv = " + (date, platform, uv))
}
}
lastSize = currSize
case "retain" =>
beginDate = LocalDate.now().minusDays(2)
}
}
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
}
}
def main(args: Array[String]): Unit = {
val hosts = new ZkHosts("xxx")
val spoutConfig = new SpoutConfig(hosts, "xxx", "/xxx", "xxx")
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme)
spoutConfig.startOffsetTime = OffsetRequest.LatestTime
val builder = new TopologyBuilder
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig))
builder.setSpout("command-spout", new CommandSpout)
builder.setBolt("keeper", new KeeperBolt)
.allGrouping("command-spout", "command")
.localOrShuffleGrouping("kafka-spout")
val conf = new Config
conf.setNumWorkers(1)
val topo = builder.createTopology()
val cluster = new LocalCluster()
StormSubmitter.submitTopologyWithProgressBar("Boom", conf, topo)
}
}
潜在问题
Storm方案的API非常底层,像定时任务这样的事情就需要单独建立一个Spout,代码体量已经与
核心业务逻辑相当,非常繁琐.将数据按照时间进行隔离(或者说滑动窗口)功能也需要和业务逻辑交织在一起,
当需要更改程序的时候人需要多花一些经历找到想更改的代码片段.每个节点做什么事情,节点间如何连接都需要人为指定.当一个陌生人刚接手代码的时候,
他很难通过读一小段核心代码就了解大概,需要通读上下文,在不同类之间多次跳转才能理解.
Flink 方案
Flink针对以上情况作出了改进, API 的抽象程度进行了提高,并且针对常见的使用场景提供了
很多实用工具,下面就是 Flink 实现这个需求的例子,可配合官方文档阅读.
输入
和 Storm 一样,Flink 标配了 Kafka 客户端,配置上参数就能使用了
val properties = new Properties()
properties.setProperty("bootstrap.servers", "xxx")
properties.setProperty("zookeeper.connect", "xxx")
properties.setProperty("group.id", "xxx")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setBufferTimeout(100)
val consumer = new FlinkKafkaConsumer08[JsObject]("order_refer_v6.1", new DeserializationSchema[JsObject] {
override def isEndOfStream(t: JsObject): Boolean = false
override def deserialize(bytes: Array[Byte]): JsObject = {
Json.parse(new String(bytes)).as[JsObject]
}
override def getProducedType: TypeInformation[JsObject] = TypeInformation.of(classOf[JsObject])
}, properties)
定期清理旧数据和定时输出
Flink提供了Window和Watermark两个工具,可以将定时需求和业务逻辑隔离开来,
所以我们先讲这个.
Watermark:
Watermark 是 Flink中用来记录数据进度的工具,一旦 Watermark 超过设定的某个时间
窗口尾端了, Flink 就认为一个时间窗口已经过时了,不应该再被改变,就会将时间窗口
对应的内容从内存中剔除,达到一个新陈代谢的作用.
在下面的代码例子中,由于内存比较宽裕,日志的乱序程度又可能比较大,我并不是用日志中的
时间作为生成Watermark的依据,而是使用系统时间,将三天内的数据全都保留在内存中,当然
timestamp还是从日志得到的,因为timestamp要作为区分Window的依据,如果用系统时间来
生成timestamp就会产生因为程序启动时间不同而产生不同结果的问题了.
consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[JsObject] {
override def getCurrentWatermark: Watermark = {
new Watermark(System.currentTimeMillis() - 86400l * 1000 * 3)
}
override def extractTimestamp(t: JsObject, l: Long): Long = {
(t \ "time").as[Long] * 1000
}
})
Window:
Watermark设置完成后,使用Window功能非常简单,加两句话就可以了
env.addSource(consumer)
.keyBy(_.\("platform").as[Int])
.timeWindow(Time.days(1))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
timeWindow(Time.days(1))
表示把数据按天进行分隔,后面还要带一个trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
是因为,虽然我们关心的是每天的总结果,但是不希望在一天结束的时候才看到这个结果,
而是每秒钟都能获得最新结果的输出.
业务逻辑
窗口的事情处理完了,现在可以真的开始处理业务逻辑了,Flink中的数据流经过Window处理后
叫做WindowedStream,WindowedStream有一个fold方法,可以将一个Window中的数据进行
聚合产生新的数据,但是除此之外,我们还想知道数据所在窗口的起始时间和结束时间,
这就需要用到apply方法,这相当于fold方法的一个扩展,在进行聚合的同时还能获得窗口信息.
env.addSource(consumer)
.keyBy(_.\("platform").as[Int])
.timeWindow(Time.days(1))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
.applyWith((0, Set.empty[Int], 0l, 0l))(
foldFunction = {
case ((_, set, _, 0), jsObj) =>
val platform = (jsObj \ "platform").as[Int]
val uuid = (jsObj \ "uuid").as[String]
(platform, set + uuid.hashCode, 0, 0)
},
windowFunction = {
case (key, window, results) =>
results.map {
case (platform, set, _, _) =>
(platform, set, window.getStart, window.getEnd)
}
}
)
现在我们拥有了最新的 (平台,集合,窗口起始时间,窗口结束时间),但是一方面我们不需要集合
的内容,只需要了解它最新的大小信息,另一方面集合的大小未必每秒都会改变,为了减少不必要
的输出,我们要在连续两条相同的日志里去掉一条,一般集合的map/flatMap方法做不到这一点,
但是Flink额外提供了一个flatMapWithState方法,可以根据数据流中之前的数据执行不同的
逻辑,下面就是对数据流中的重复数据进行去重的例子.
env.addSource(consumer)
.keyBy(_.\("platform").as[Int])
.timeWindow(Time.days(1))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
.applyWith((0, Set.empty[Int], 0l, 0l))(
foldFunction = {
case ((_, set, _, 0), jsObj) =>
val platform = (jsObj \ "platform").as[Int]
val uuid = (jsObj \ "uuid").as[String]
(platform, set + uuid.hashCode, 0, 0)
},
windowFunction = {
case (key, window, results) =>
results.map {
case (platform, set, _, _) =>
(platform, set, window.getStart, window.getEnd)
}
}
)
.mapWith {
case (key, set, windowStart, windowEnd) =>
(key, set.size, windowStart, windowEnd)
}
.keyBy(0)
.flatMapWithState[(Int, Int, Long, Long), Int] {
case ((key, num, begin, end), curr) =>
curr match {
case Some(numCurr) if numCurr == num =>
(Seq.empty, Some(num)) //如果之前已经有相同的数据,则返回空结果
case _ =>
(Seq((key, num, begin, end)), Some(num))
}
}
这样一来我们就可以做到,数据不更新时不输出结果,数据更新时最快速度输出最新结果.
完整代码
import java.util.Properties
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import play.api.libs.json.{JsObject, Json}
object FlinkMain {
def main(args: Array[String]): Unit = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "xxx")
properties.setProperty("zookeeper.connect", "xxx")
properties.setProperty("group.id", "xxx")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setBufferTimeout(100)
val consumer = new FlinkKafkaConsumer08[JsObject]("order_refer_v6.1", new DeserializationSchema[JsObject] {
override def isEndOfStream(t: JsObject): Boolean = false
override def deserialize(bytes: Array[Byte]): JsObject = {
Json.parse(new String(bytes)).as[JsObject]
}
override def getProducedType: TypeInformation[JsObject] = TypeInformation.of(classOf[JsObject])
}, properties)
consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[JsObject] {
override def getCurrentWatermark: Watermark = {
new Watermark(System.currentTimeMillis() - 86400l * 1000 * 3)
}
override def extractTimestamp(t: JsObject, l: Long): Long = {
(t \ "time").as[Long] * 1000
}
})
val stream =
env.addSource(consumer)
.keyBy(_.\("platform").as[Int])
.timeWindow(Time.days(1))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
.applyWith((0, Set.empty[Int], 0l, 0l))(
foldFunction = {
case ((_, set, _, 0), jsObj) =>
val platform = (jsObj \ "platform").as[Int]
val uuid = (jsObj \ "uuid").as[String]
(platform, set + uuid.hashCode, 0, 0)
},
windowFunction = {
case (key, window, results) =>
results.map {
case (platform, set, _, _) =>
(platform, set, window.getStart, window.getEnd)
}
}
)
.mapWith {
case (key, set, windowStart, windowEnd) =>
(key, set.size, windowStart, windowEnd)
}
.keyBy(0)
.flatMapWithState[(Int, Int, Long, Long), Int] {
case ((key, num, begin, end), curr) =>
curr match {
case Some(numCurr) if numCurr == num =>
(Seq.empty, Some(num)) //如果之前已经有相同的数据,则返回空结果
case _ =>
(Seq((key, num, begin, end)), Some(num))
}
}
stream.print()
env.execute("Boom")
}
}
和 Storm API 比较
从”让人类更轻松”这个角度而言,Flink API 很好地优化了我上面提到的几个问题,当然与此同时
你也失去了更精细化控制一些东西的能力,不过我认为就大部分的日常工作而言,让人类更轻松才是
我们应该追求的目标,利用精细控制来特别优化程序,很可能只在少数非常重要的业务上
其实可能也并没有差那么多
Storm最近的版本增加了WindowedBolt和Watermark功能,如果配合Storm一直有的Trident API
的话,应该是可以用很接近Flink API的方式写出一样的逻辑的,虽然Trident API不明原因地
似乎一直不怎么流行.
事实上,我觉得主流的流计算框架API都会变得越来越像.甚至大部分不同的框架会支持同一套API,
比如现在 Apache Beam
就同时有spark和flink的实现,我觉得最后的发展,还是要看每个框架的砸钱社区发展做得好不好.