1-消费者和消费组
2-KafkaConsumer的使用
3-KafkaConsumer在flume中的使用
消费者和消费组
消费者是负责订阅Kafka中的Topic,并且从订阅的主题上面拉取消息。
在Kafka中还有一层消费组的概念,每个消费组都有一个对应的消费组。
需要了解的是:每个消费者只能消费所分配到的分区中的消息。每一个分区的消息只能被一个
消费组中的一个消费者所消费。
每个消费组都有一个固定的名称,可以通过消费者客户端的参数group.id指定。
KafkaConsumer的使用
首先添加maven依赖 <kafka.version>0.9.0.1</kafka.version>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>
一般会按照以下方式去进行开发:
配置消费者客户端的参数,创建消费者实例
订阅需要消费的主题
拉取消息进行消费
提交消费的位移
关闭消费者
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.88.129:9092");
String consumeGroup = "cg1";
props.put("group.id", consumeGroup);
// Set this property, if auto commit should happen.
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "999999999999");
// This is how to control number of messages being read in each poll
props.put("max.partition.fetch.bytes", "135");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
private static void processRecords(KafkaConsumer<String, String> consumer) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
long lastOffset = 0;
for (ConsumerRecord<String, String> record : records) {
System.out.printf("\n\roffset = %d, partition = %s, key = %s, value = %s\n", record.offset(), record.partition(),
record.key(), record.value());
lastOffset = record.offset();
}
System.out.println("\n\r===========>lastOffset read: " + lastOffset);
process();//处理拉取的数据
consumer.commitSync();
}
}
上面简单实现了一个消费者。我们来进一步了解一下KafkaConsumer。
一般来说创建好消费者后,就需要订阅主题和分区了,Kafka支持直接订阅某些主题的特定分区。
在KafkaConsumer中使用assign()方法来实现这些功能。此方法具体如下:
public void assign(Collection<TopicPartition>partitions)
对于生产者来说,会有序列化器的功能,所以消费者也会提供一个反序列化器让我们获取到我们需要的
value.
在Kafka中消息的消费是基于拉模式的。Kafka中的消息消费是一个不断轮训的过程,消费者重复的
调用poll()方法。
消费者消费到的每条消息类型为ConsumerRecord.
poll()方法的返回值类型是ConsumerRecords,它用来表示一次拉取操作所获得的消息集内部包含
了若干的ConsumerRecord。
接下来就是消费位移的提交了。kafkaConsumer默认会自动提交位移。我们也可以关闭自动提交,
实现手动提交位移。这里先不做具体的讨论,后续会单独进行讨论。
对比KafkaProducer,KafkaConsumer是非线程安全的。但是并不意味着我们在消费消息的时候只能以单线程的方式进行。
我们可以通过多线程的方式进行消费。
即每个线程实例化一个KafkaConsumer对象,一个线程对应一个KafkaConsumer实例。
一个消费线程可以消费一个或者多个分区,所有的消费线程都属于同一个消费组。
当然还有其他的实现方式,后续继续讨论
KafkaConsumer在flume中的使用
在KafkaSource中我们可以看到比较完整的KafkaConsumer的使用
1-doConfigure
这里进行配置消费者客户端的参数
private void setConsumerProps(Context ctx) {
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER);
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER);
//Defaults overridden based on config
kafkaProps.putAll(ctx.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX));
//These always take precedence over config
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
if (groupId != null) {
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
}
2-doStart()
这里进行初始化消费者实例,订阅主题,拉取消息。
//initialize a consumer.
consumer = new KafkaConsumer<String, byte[]>(kafkaProps);
// Subscribe for topics by already specified strategy
subscriber.subscribe(consumer, new SourceRebalanceListener(rebalanceFlag));
// Connect to kafka. 1 second is optimal time.
it = consumer.poll(1000).iterator();
log.info("Kafka source {} started.", getName());
3-doProcess()
这里对拉取的消息进行消费处理。
// get next message
ConsumerRecord<String, byte[]> message = it.next();
kafkaKey = message.key();
kafkaMessage = message.value();
//可以直接获取到消息内容
eventBody = message.value();
headers.clear();
headers = new HashMap<String, String>(4);
//组装消息信息到event,加入到批次里面
event = EventBuilder.withBody(eventBody, headers);
eventList.add(event);
// 对于每个分区,存储将要读取的下一个偏移量
tpAndOffsetMetadata.put(new TopicPartition(message.topic(), message.partition()),
new OffsetAndMetadata(message.offset() + 1, batchUUID));
//开始提交一批次到channel中,提交偏移量到kafka
if (eventList.size() > 0) {
counter.addToKafkaEventGetTimer((System.nanoTime() - nanoBatchStartTime) / (1000 * 1000));
counter.addToEventReceivedCount((long) eventList.size());
//提交一批次到channel中,需要时间
//注意:如果新增了一个消费者,这里耗时过久的话,另一个消费者已经启动,可能会造成还没有提交到kafka,导致这些没有被提交的数据会被重复消费
getChannelProcessor().processEventBatch(eventList);
counter.addToEventAcceptedCount(eventList.size());
if (log.isDebugEnabled()) {
log.debug("Wrote {} events to channel", eventList.size());
}
eventList.clear();
//提交偏移量到kafka,同步提交 CommitFailedException
if (!tpAndOffsetMetadata.isEmpty()) {
long commitStartTime = System.nanoTime();
consumer.commitSync(tpAndOffsetMetadata);
long commitEndTime = System.nanoTime();
counter.addToKafkaCommitTimer((commitEndTime - commitStartTime) / (1000 * 1000));
tpAndOffsetMetadata.clear();
}
return Status.READY;
}
这里包含了手动消费位移的提交.在完整的flume代码中还涉及到了消费者的再均衡操作。
后续会详细的讲解。这里整体看一下在flume中是怎么使用KafkaConsumer的。
后续
消费位移的提交方式
多线程消费探讨
再均衡