(1)kafka和flume都是日志系统。kafka是分布式消息中间件,自带存储,提供push和pull存取数据功能。flume分为agent(数据采集器)[source channel sink]。
(2)kafka做日志缓存应该是更为合适的,但是 flume的数据采集部分做的很好,可以定制很多数据源,减少开发量。所以比较流行flume+kafka模式,如果为了利用flume写hdfs的能力,也可以采用kafka+flume的方式。
采集层 主要可以使用Flume, Kafka两种技术。
Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.
Kafka:Kafka是一个可持久化的分布式的消息队列。
Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。
正如你们所知Flume内置很多的source和sink组件。Kafka明显有一个更小的生产消费者生态系统,使用Kafka意味着你准备好了编写你自己的生产者和消费者代码。如果已经存在的Flume Sources和Sinks满足你的需求,并且你更喜欢不需要任何开发的系统,请使用Flume。
Flume可以使用拦截器实时处理数据。这些对数据屏蔽或者过量是很有用的。Kafka需要外部的流处理系统才能做到。
Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。于是,如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果你需要一个高可靠行的管道,那么使用Kafka是个更好的选择。
Flume和Kafka可以很好地结合起来使用。如果你的设计需要从Kafka到Hadoop的流数据,使用Flume代理并配置Kafka的Source读取数据也是可行的:你没有必要实现自己的消费者。你可以直接利用Flume与HDFS及HBase的结合的所有好处。你可以使用Cloudera Manager对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。
Flume和Kafka可以结合起来使用。通常会使用Flume + Kafka的方式。其实如果为了利用Flume已有的写HDFS功能,也可以使用Kafka + Flume的方式。
flume+kafka flume接收kafka的数据 写入hive和hbase
配置参考:
Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics.If you have multiple Kafka sources running, you can configure them with the same Consumer Groupso each will read a unique set of partitions for the topics.
Property Name Default Description
channels
type The component type name, needs to beorg.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers List of brokers in the Kafka cluster used by the source
kafka.consumer.group.id flume Unique identified of consumer group. Setting the same id in multiple sources or agentsindicates that they are part of the same consumer group
kafka.topics Comma-separated list of topics the kafka consumer will read messages from.
kafka.topics.regex Regex that defines set of topics the source is subscribed on. This property has higher prioritythankafka.topics and overrideskafka.topics if exists.
batchSize 1000 Maximum number of messages written to Channel in one batch
batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to ChannelThe batch will be written whenever the first of size and time will be reached.
……….
Other Kafka Consumer Properties These properties are used to configure the Kafka Consumer. Any consumer property supportedby Kafka can be used. The only requirement is to prepend the property name with the prefixkafka.consumer .eg:kafka.consumer.auto.offset.reset
flume-ng-node Application main方法解析shell命令 加载configuration 形成context上下文,执行LifecycleAware接口的start方法,kafkasource 继承了abstractsource 实现了 configurable和pollablesource(轮询),重写了LifecycleAware的start和stop方法,configurable的configure方法,以及pollablesource的process方法。执行LifecycleAware的方法时实际上运行的是kafkasource的start方法,根据kafka配置建立消费者连接和kafkastream流。在start之前加载了配置文件时已经将重载后的configure执行了一遍,更改了一些source的配置,如果没有的话会有默认配置。
之后由sourceRunner的子类PollableSourceRunner驱动kafkasource运行(PollableSourceRunner启动了一个PollingRunner线程,该线程调用了kafkasource的process方法),process方法将stream的流数据读取转换为channel所需的event,写入channel中。
当时间达到一定的时间间隔或者批处理的事件条数达到一定数目时,将eventlist一次性发往channel(由absractsource的getChannelProcessor返回的processor调用processEventBatch方法发送)。
核心代码为:
while (eventList.size() < batchUpperLimit &&
System.currentTimeMillis() < maxBatchEndTime) {
.....
event = EventBuilder.withBody(eventBody, headers);
eventList.add(event);
....
}
if (eventList.size() > 0) {
counter.addToKafkaEventGetTimer((System.nanoTime() - nanoBatchStartTime) / (1000 * 1000));
counter.addToEventReceivedCount((long) eventList.size());
getChannelProcessor().processEventBatch(eventList);
counter.addToEventAcceptedCount(eventList.size());
if (log.isDebugEnabled()) {
log.debug("Wrote {} events to channel", eventList.size());
}
eventList.clear();
基本配置为:
public class KafkaSourceConstants {
public static final String KAFKA_PREFIX = "kafka.";
public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
public static final String DEFAULT_KEY_DESERIALIZER =
"org.apache.kafka.common.serialization.StringDeserializer";
public static final String DEFAULT_VALUE_DESERIALIZER =
"org.apache.kafka.common.serialization.ByteArrayDeserializer";
public static final String BOOTSTRAP_SERVERS =
KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
public static final String TOPICS = KAFKA_PREFIX + "topics";
public static final String TOPICS_REGEX = TOPICS + "." + "regex";
public static final String DEFAULT_AUTO_COMMIT = "false";
public static final String BATCH_SIZE = "batchSize";
public static final String BATCH_DURATION_MS = "batchDurationMillis";
public static final int DEFAULT_BATCH_SIZE = 1000;
public static final int DEFAULT_BATCH_DURATION = 1000;
public static final String DEFAULT_GROUP_ID = "flume";
public static final String MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets";
public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true;
public static final String AVRO_EVENT = "useFlumeEventFormat";
public static final boolean DEFAULT_AVRO_EVENT = false;
/* Old Properties */
public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect";
public static final String TOPIC = "topic";
public static final String OLD_GROUP_ID = "groupId";
// flume event headers
public static final String TOPIC_HEADER = "topic";
public static final String KEY_HEADER = "key";
public static final String TIMESTAMP_HEADER = "timestamp";
public static final String PARTITION_HEADER = "partition";
}
kafkasource根据消费者建立的数据流datastream读取,每隔一段批处理时间或者数据达到一定数目将会往channel写数据。