Flink - FlinkKafkaConsumer08

先看

AbstractFetcher
这个可以理解就是,consumer中具体去kafka读数据的线程,一个fetcher可以同时读多个partitions的数据

《Flink - FlinkKafkaConsumer08》

/**
 * Base class for all fetchers, which implement the connections to Kafka brokers and
 * pull records from Kafka partitions.
 * 
 * <p>This fetcher base class implements the logic around emitting records and tracking offsets,
 * as well as around the optional timestamp assignment and watermark generation. 
 * 
 * @param <T> The type of elements deserialized from Kafka's byte records, and emitted into
 *            the Flink data streams.
 * @param <KPH> The type of topic/partition identifier used by Kafka in the specific version.
 */
public abstract class AbstractFetcher<T, KPH> {
    
    /** The source context to emit records and watermarks to */
    private final SourceContext<T> sourceContext; //用于发送数据的context

    /** All partitions (and their state) that this fetcher is subscribed to */
    private final KafkaTopicPartitionState<KPH>[] allPartitions; //用于记录每个topic partition的状态,比如offset


    // ------------------------------------------------------------------------
    
    protected AbstractFetcher(
            SourceContext<T> sourceContext,
            List<KafkaTopicPartition> assignedPartitions,
            SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            StreamingRuntimeContext runtimeContext) throws Exception
    {
        // create our partition state according to the timestamp/watermark mode 
        this.allPartitions = initializePartitions(
                assignedPartitions,
                timestampWatermarkMode,
                watermarksPeriodic, watermarksPunctuated,
                runtimeContext.getUserCodeClassLoader());
        
        // if we have periodic watermarks, kick off the interval scheduler
        if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
            KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts = 
                    (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
            
            PeriodicWatermarkEmitter periodicEmitter = 
                    new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext);
            periodicEmitter.start();  //定期的发出waterMark
        }
    }

    // ------------------------------------------------------------------------
    //  Core fetcher work methods
    // ------------------------------------------------------------------------

    public abstract void runFetchLoop() throws Exception; //核心的函数,需要重载
    
    // ------------------------------------------------------------------------
    //  Kafka version specifics
    // ------------------------------------------------------------------------
    
    /**
     * Creates the Kafka version specific representation of the given
     * topic partition.
     * 
     * @param partition The Flink representation of the Kafka topic partition.
     * @return The specific Kafka representation of the Kafka topic partition.
     */
    public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition);//生成KafkaPartitionHandle,这个其实是kafka中对partition的描述

    /**
     * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for
     * older Kafka versions).
     * 
     * @param offsets The offsets to commit to Kafka.
     * @throws Exception This method forwards exceptions.
     */
    public abstract void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception; //如果commit相应的kafka offset,比如写到zk

    // ------------------------------------------------------------------------
    //  snapshot and restore the state
    // ------------------------------------------------------------------------

    /**
     * Takes a snapshot of the partition offsets.
     * 
     * <p>Important: This method mus be called under the checkpoint lock.
     * 
     * @return A map from partition to current offset.
     */
    public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() { //产生所有partitions的snapshot,主要是offset
        // this method assumes that the checkpoint lock is held
        assert Thread.holdsLock(checkpointLock);

        HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
        for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
            if (partition.isOffsetDefined()) {
                state.put(partition.getKafkaTopicPartition(), partition.getOffset());
            }
        }
        return state;
    }

    /**
     * Restores the partition offsets.
     * 
     * @param snapshotState The offsets for the partitions 
     */
    public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) { //从checkpoint中去恢复offset
        for (KafkaTopicPartitionState<?> partition : allPartitions) {
            Long offset = snapshotState.get(partition.getKafkaTopicPartition());
            if (offset != null) {
                partition.setOffset(offset);
            }
        }
    }
    
    // ------------------------------------------------------------------------
    //  emitting records
    // ------------------------------------------------------------------------

    /**
     * 
     * <p>Implementation Note: This method is kept brief to be JIT inlining friendly.
     * That makes the fast path efficient, the extended paths are called as separate methods.
     * 
     * @param record The record to emit
     * @param partitionState The state of the Kafka partition from which the record was fetched
     * @param offset The offset from which the record was fetched
     */
    protected final void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) { //真正的emit record
        if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
            // fast path logic, in case there are no watermarks

            // emit the record, using the checkpoint lock to guarantee
            // atomicity of record emission and offset state update
            synchronized (checkpointLock) {
                sourceContext.collect(record); //发出record
                partitionState.setOffset(offset); //更新local offset
            }
        }
        else if (timestampWatermarkMode == PERIODIC_WATERMARKS) { //如果有需要定期的watermark
            emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset);
        }
        else {
            emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset);
        }
    }

    /**
     * Record emission, if a timestamp will be attached from an assigner that is
     * also a periodic watermark generator.
     */
    private void emitRecordWithTimestampAndPeriodicWatermark(
            T record, KafkaTopicPartitionState<KPH> partitionState, long offset)
    {

        // extract timestamp - this accesses/modifies the per-partition state inside the
        // watermark generator instance, so we need to lock the access on the
        // partition state. concurrent access can happen from the periodic emitter
        final long timestamp;
        //noinspection SynchronizationOnLocalVariableOrMethodParameter
        synchronized (withWatermarksState) {
            timestamp = withWatermarksState.getTimestampForRecord(record); //调用waterMark.extractTimestamp来获取该record的event time
        }

        // emit the record with timestamp, using the usual checkpoint lock to guarantee
        // atomicity of record emission and offset state update 
        synchronized (checkpointLock) {
            sourceContext.collectWithTimestamp(record, timestamp); //这个emit接口,会在发送record的情况下,还加上event time
            partitionState.setOffset(offset);
        }
    }

 
    
    // ------------------------------------------------------------------------
    
    /**
     * The periodic watermark emitter. In its given interval, it checks all partitions for
     * the current event time watermark, and possibly emits the next watermark.
     */
    private static class PeriodicWatermarkEmitter implements Triggerable {

        private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
        
        private final SourceContext<?> emitter;
        
        private final StreamingRuntimeContext triggerContext;

        private final long interval;
        
        private long lastWatermarkTimestamp;
        
        //-------------------------------------------------

        PeriodicWatermarkEmitter(
                KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
                SourceContext<?> emitter,
                StreamingRuntimeContext runtimeContext)
        {
            this.allPartitions = checkNotNull(allPartitions);
            this.emitter = checkNotNull(emitter);
            this.triggerContext = checkNotNull(runtimeContext);
            this.interval = runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
            this.lastWatermarkTimestamp = Long.MIN_VALUE;
        }

        //-------------------------------------------------
        
        public void start() {
            triggerContext.registerTimer(System.currentTimeMillis() + interval, this); //注册timer
        }
        
        @Override
        public void trigger(long timestamp) throws Exception {
            // sanity check
            assert Thread.holdsLock(emitter.getCheckpointLock());
            
            long minAcrossAll = Long.MAX_VALUE;
            for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) {
                
                // we access the current watermark for the periodic assigners under the state
                // lock, to prevent concurrent modification to any internal variables
                final long curr;
                //noinspection SynchronizationOnLocalVariableOrMethodParameter
                synchronized (state) {
                    curr = state.getCurrentWatermarkTimestamp(); //获取waterMark
                }
                
                minAcrossAll = Math.min(minAcrossAll, curr);
            }
            
            // emit next watermark, if there is one
            if (minAcrossAll > lastWatermarkTimestamp) {
                lastWatermarkTimestamp = minAcrossAll;
                emitter.emitWatermark(new Watermark(minAcrossAll)); //emit waterMark
            }
            
            // schedule the next watermark
            triggerContext.registerTimer(System.currentTimeMillis() + interval, this); //再次注册timer
        }
    }
}

《Flink - FlinkKafkaConsumer08》

 
Kafka08Fetcher

基于kafka 0.8版本的fetcher,

public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition>

核心的函数,是重写

runFetchLoop

《Flink - FlinkKafkaConsumer08》

@Override
    public void runFetchLoop() throws Exception {
        // the map from broker to the thread that is connected to that broker
        final Map<Node, SimpleConsumerThread<T>> brokerToThread = new HashMap<>(); //cache每个partition node到每个SimpleConsumerThread的对应关系

        // the offset handler handles the communication with ZooKeeper, to commit externally visible offsets
        final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig); //Zookeeper Handler,用于r/w数据到zookeeper
        this.zookeeperOffsetHandler = zookeeperOffsetHandler;

        PeriodicOffsetCommitter periodicCommitter = null;
        try {
            // read offsets from ZooKeeper for partitions that did not restore offsets
            {
                List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
                for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
                    if (!partition.isOffsetDefined()) {  //遍历每个partition,如果没有定义offset,即offset没有从checkpoint中恢复,加入partitionsWithNoOffset
                        partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
                    }
                }
                
                //这步仅仅对于没有从ckeckpoint中读到offset的partitionsWithNoOffset
                Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getOffsets(partitionsWithNoOffset); //从zk中读出,相应partition的offset
                for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
                    Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
                    if (offset != null) {
                        partition.setOffset(offset); //为partition设置从zk中读出的offset
                    }
                }
            }

            // start the periodic offset committer thread, if necessary
            if (autoCommitInterval > 0) { //定期触发commit offsets,比如发送到zk,路径,topic_groupid + "/" + partition;
                periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler, 
                        subscribedPartitions(), errorHandler, autoCommitInterval);
                periodicCommitter.setName("Periodic Kafka partition offset committer");
                periodicCommitter.setDaemon(true);
                periodicCommitter.start();
            }

            // Main loop polling elements from the unassignedPartitions queue to the threads
            while (running) {
                
                // wait for max 5 seconds trying to get partitions to assign
                // if threads shut down, this poll returns earlier, because the threads inject the
                // special marker into the queue
                List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign = 
                        unassignedPartitionsQueue.getBatchBlocking(5000);
                partitionsToAssign.remove(MARKER); //这边这个marker干嘛用的。。。,防止上面被block?

                if (!partitionsToAssign.isEmpty()) {
                    LOG.info("Assigning {} partitions to broker threads", partitionsToAssign.size());
                    Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders =  //通过broker server找到partitions的leader,返回的结果,map(leader <-> partition list) 
                            findLeaderForPartitions(partitionsToAssign, kafkaConfig);

                    // assign the partitions to the leaders (maybe start the threads)
                    for (Map.Entry<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeader : 
                            partitionsWithLeaders.entrySet())
                    {
                        final Node leader = partitionsWithLeader.getKey(); //leader node
                        final List<KafkaTopicPartitionState<TopicAndPartition>> partitions = partitionsWithLeader.getValue(); //这个leader node可以读取的partition列表
                        SimpleConsumerThread<T> brokerThread = brokerToThread.get(leader); //找到leader node对应的consumer thread

                        if (brokerThread == null || !brokerThread.getNewPartitionsQueue().isOpen()) {
                            // start new thread
                            brokerThread = createAndStartSimpleConsumerThread(partitions, leader, errorHandler); //如果没有相应的consumer thread,创建新的consumer thread
                            brokerToThread.put(leader, brokerThread); //
                        }
                        else {
                            // put elements into queue of thread
                            ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue =  //
                                    brokerThread.getNewPartitionsQueue();
                            
                            for (KafkaTopicPartitionState<TopicAndPartition> fp : partitions) {
                                if (!newPartitionsQueue.addIfOpen(fp)) {  //
                                    // we were unable to add the partition to the broker's queue
                                    // the broker has closed in the meantime (the thread will shut down)
                                    // create a new thread for connecting to this broker
                                    List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new ArrayList<>();
                                    seedPartitions.add(fp);
                                    brokerThread = createAndStartSimpleConsumerThread(seedPartitions, leader, errorHandler);
                                    brokerToThread.put(leader, brokerThread);
                                    newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for the subsequent partitions
                                }
                            }
                        }
                    }
                }
            }
        }
        catch (InterruptedException e) {
           //......
        }
        finally {
           //......
        }
    }

《Flink - FlinkKafkaConsumer08》

 

其他一些接口实现,

《Flink - FlinkKafkaConsumer08》

// ------------------------------------------------------------------------
    //  Kafka 0.8 specific class instantiation
    // ------------------------------------------------------------------------

    @Override
    public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
        return new TopicAndPartition(partition.getTopic(), partition.getPartition()); //对于kafka0.8,KafkaPartitionHandle就是TopicAndPartition
    }

    // ------------------------------------------------------------------------
    //  Offset handling
    // ------------------------------------------------------------------------

    @Override
    public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
        ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
        if (zkHandler != null) {
            zkHandler.writeOffsets(offsets); //commit offsets是写到zookeeper的
        }
    }

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    private SimpleConsumerThread<T> createAndStartSimpleConsumerThread(
            List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions,
            Node leader,
            ExceptionProxy errorHandler) throws IOException, ClassNotFoundException
    {
        // each thread needs its own copy of the deserializer, because the deserializer is
        // not necessarily thread safe
        final KeyedDeserializationSchema<T> clonedDeserializer =
                InstantiationUtil.clone(deserializer, userCodeClassLoader);

        // seed thread with list of fetch partitions (otherwise it would shut down immediately again
        SimpleConsumerThread<T> brokerThread = new SimpleConsumerThread<>(
                this, errorHandler, kafkaConfig, leader, seedPartitions, unassignedPartitionsQueue, 
                clonedDeserializer, invalidOffsetBehavior);

        brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
                taskName, leader.id(), leader.host(), leader.port()));
        brokerThread.setDaemon(true);
        brokerThread.start(); //创建和启动SimpleConsumerThread

        LOG.info("Starting thread {}", brokerThread.getName());
        return brokerThread;
    }

《Flink - FlinkKafkaConsumer08》

 

下面来看看SimpleConsumerThread

class SimpleConsumerThread<T> extends Thread

核心函数run,主要做的是,不停的读取数据的事情,

《Flink - FlinkKafkaConsumer08》

// these are the actual configuration values of Kafka + their original default values.
    this.soTimeout = getInt(config, "socket.timeout.ms", 30000); //Kafka的一些配置
    this.minBytes = getInt(config, "fetch.min.bytes", 1);
    this.maxWait = getInt(config, "fetch.wait.max.ms", 100);
    this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576);
    this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536);
    this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3);
   
    // ------------------------------------------------------------------------
    //  main work loop
    // ------------------------------------------------------------------------
    
    @Override
    public void run() {
        try {
            // create the Kafka consumer that we actually use for fetching
            consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId); //创建SimpleConsumer
            
            // make sure that all partitions have some offsets to start with
            // those partitions that do not have an offset from a checkpoint need to get
            // their start offset from ZooKeeper
            getMissingOffsetsFromKafka(partitions);  //为没有offset信息的partition,重置offset,从latest或earlist

            // Now, the actual work starts :-)
            int offsetOutOfRangeCount = 0; //用于统计实际执行情况,非法offset,或重连的计数
            int reconnects = 0;
            while (running) {

                // ----------------------------------- partitions list maintenance ----------------------------

                // check queue for new partitions to read from:
                List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = newPartitionsQueue.pollBatch(); //对于new partitions的处理,主要就是把它们加到partitions当中
                if (newPartitions != null) {
                    // found some new partitions for this thread's broker
                    
                    // check if the new partitions need an offset lookup
                    getMissingOffsetsFromKafka(newPartitions); // 为新的partition重置offset 
                    
                    // add the new partitions (and check they are not already in there)
                    for (KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) {
                        partitions.add(newPartition);
                    }
                }

                if (partitions.size() == 0) { //如果partitions为空,即没有需要消费的partition
                    if (newPartitionsQueue.close()) { //如果此时newPartitionsQueue为closed,那么就不可能会有新的partitions加入,那么该thread就没有存在的意义,不需要继续run
                        // close succeeded. Closing thread
                        running = false; //关闭线程
                        
                        LOG.info("Consumer thread {} does not have any partitions assigned anymore. Stopping thread.", 
                                getName());

                        // add the wake-up marker into the queue to make the main thread
                        // immediately wake up and termination faster
                        unassignedPartitions.add(MARKER);

                        break;
                    } else { //如果newPartitionsQueue没有被关闭,那就等待新的partitions,continue
                        // close failed: fetcher main thread concurrently added new partitions into the queue.
                        // go to top of loop again and get the new partitions
                        continue; 
                    }
                }

                // ----------------------------------- request / response with kafka ----------------------------

                FetchRequestBuilder frb = new FetchRequestBuilder(); //创建FetchRequestBuilder
                frb.clientId(clientId);
                frb.maxWait(maxWait);
                frb.minBytes(minBytes);

                for (KafkaTopicPartitionState<?> partition : partitions) {
                    frb.addFetch(
                            partition.getKafkaTopicPartition().getTopic(),
                            partition.getKafkaTopicPartition().getPartition(),
                            partition.getOffset() + 1, // request the next record
                            fetchSize);
                }
                
                kafka.api.FetchRequest fetchRequest = frb.build(); //创建FetchRequest,一个request可以同时读多个partition,取决于partition和consumer数量的比例

                FetchResponse fetchResponse;
                try {
                    fetchResponse = consumer.fetch(fetchRequest); //从kafka读到数据,包含在FetchResponse中,其中包含从多个partition中取到的数据
                }
                catch (Throwable cce) {
                    //noinspection ConstantConditions
                    if (cce instanceof ClosedChannelException) { //链接kafka异常
                       
                        // we don't know if the broker is overloaded or unavailable.
                        // retry a few times, then return ALL partitions for new leader lookup
                        if (++reconnects >= reconnectLimit) {  //如果达到重连limit,说明确实无法连接到kafka
                            LOG.warn("Unable to reach broker after {} retries. Returning all current partitions", reconnectLimit);
                            for (KafkaTopicPartitionState<TopicAndPartition> fp: this.partitions) {
                                unassignedPartitions.add(fp); //把负责的partitions加入unassignedPartitions,表明这些partition是没人处理的
                            }
                            this.partitions.clear(); //把partitons清空
                            continue; // jump to top of loop: will close thread or subscribe to new partitions //这步会走到上面的partitions.size() == 0的逻辑
                        }
                        try { //如果需要重试,先关闭consumer,然后重新创建consumer,然后continue重试
                            consumer.close();
                        } catch (Throwable t) {
                            LOG.warn("Error while closing consumer connection", t);
                        }
                        // delay & retry
                        Thread.sleep(100);
                        consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
                        continue; // retry
                    } else {
                        throw cce;
                    }
                }
                reconnects = 0;

                // ---------------------------------------- error handling ----------------------------

                if (fetchResponse == null) {
                    throw new IOException("Fetch from Kafka failed (request returned null)");
                }
                
                if (fetchResponse.hasError()) { //如果fetchResponse有错误
                    String exception = "";
                    List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
                    
                    // iterate over partitions to get individual error codes
                    Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
                    boolean partitionsRemoved = false;
                    
                    while (partitionsIterator.hasNext()) {
                        final KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next();
                        short code = fetchResponse.errorCode(fp.getTopic(), fp.getPartition()); //取得对于该partition的error code

                        if (code == ErrorMapping.OffsetOutOfRangeCode()) { //非法offset,那么需要重新初始化该partition的offset
                            // we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
                            // Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
                            partitionsToGetOffsetsFor.add(fp);
                        }
                        else if (code == ErrorMapping.NotLeaderForPartitionCode() ||  //如果由于各种不可用,导致无法从该broker上读取到partition的数据
                                code == ErrorMapping.LeaderNotAvailableCode() ||
                                code == ErrorMapping.BrokerNotAvailableCode() ||
                                code == ErrorMapping.UnknownCode())
                        {
                            // the broker we are connected to is not the leader for the partition.
                            LOG.warn("{} is not the leader of {}. Reassigning leader for partition", broker, fp);
                            LOG.debug("Error code = {}", code);

                            unassignedPartitions.add(fp); //那么把该partition放回unassignedPartitions,等待重新分配

                            partitionsIterator.remove(); // unsubscribe the partition ourselves,从当前的partitions列表中把该partition删除
                            partitionsRemoved = true;
                        }
                        else if (code != ErrorMapping.NoError()) {
                            exception += "\nException for " + fp.getTopic() +":"+ fp.getPartition() + ": " +
                                    StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
                        }
                    }
                    if (partitionsToGetOffsetsFor.size() > 0) {
                        // safeguard against an infinite loop.
                        if (offsetOutOfRangeCount++ > 3) { //如果对于partitions,3次重置offset后,offset仍然有非法的,抛异常,防止无限循环
                            throw new RuntimeException("Found invalid offsets more than three times in partitions "
                                    + partitionsToGetOffsetsFor + " Exceptions: " + exception);
                        }
                        // get valid offsets for these partitions and try again.
                        LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
                        getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior); //重置这些partitions的offset, 根据配置会reset到earliest或latest
                        
                        LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
                        continue; // jump back to create a new fetch request. The offset has not been touched.
                    }
                    else if (partitionsRemoved) {
                        continue; // create new fetch request
                    }
                    else {
                        // partitions failed on an error
                        throw new IOException("Error while fetching from broker '" + broker +"': " + exception);
                    }
                } else {
                    // successful fetch, reset offsetOutOfRangeCount.
                    offsetOutOfRangeCount = 0;
                }

                // ----------------------------------- process fetch response ----------------------------

                int messagesInFetch = 0;
                int deletedMessages = 0;
                Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator();
                
                partitionsLoop:
                while (partitionsIterator.hasNext()) {
                    final KafkaTopicPartitionState<TopicAndPartition> currentPartition = partitionsIterator.next();
                    
                    final ByteBufferMessageSet messageSet = fetchResponse.messageSet( //取出fetchResponse关于该partition的数据,封装成ByteBufferMessageSet
                            currentPartition.getTopic(), currentPartition.getPartition());

                    for (MessageAndOffset msg : messageSet) { //对于每天message
                        if (running) {
                            messagesInFetch++;
                            final ByteBuffer payload = msg.message().payload(); //读出message内容
                            final long offset = msg.offset();  //读出message offset
                            
                            if (offset <= currentPartition.getOffset()) { //旧数据,ignore
                                // we have seen this message already
                                LOG.info("Skipping message with offset " + msg.offset()
                                        + " because we have seen messages until (including) "
                                        + currentPartition.getOffset()
                                        + " from topic/partition " + currentPartition.getTopic() + '/'
                                        + currentPartition.getPartition() + " already");
                                continue;
                            }

                            // If the message value is null, this represents a delete command for the message key.
                            // Log this and pass it on to the client who might want to also receive delete messages.
                            byte[] valueBytes;
                            if (payload == null) {
                                deletedMessages++;
                                valueBytes = null;
                            } else {
                                valueBytes = new byte[payload.remaining()];
                                payload.get(valueBytes); //将内容,读入valueBytes
                            }

                            // put key into byte array
                            byte[] keyBytes = null;
                            int keySize = msg.message().keySize();

                            if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization
                                ByteBuffer keyPayload = msg.message().key();
                                keyBytes = new byte[keySize]; //将key读入keyBytes
                                keyPayload.get(keyBytes);
                            }

                            final T value = deserializer.deserialize(keyBytes, valueBytes, //将message反序列化成对象
                                    currentPartition.getTopic(), currentPartition.getPartition(), offset);
                            
                            if (deserializer.isEndOfStream(value)) {
                                // remove partition from subscribed partitions.
                                partitionsIterator.remove();
                                continue partitionsLoop;
                            }
                            
                            owner.emitRecord(value, currentPartition, offset); //emit 数据
                        }
                        else {
                            // no longer running
                            return;
                        }
                    }
                }
            } // end of fetch loop

        }
    }

《Flink - FlinkKafkaConsumer08》

 

最后,看看

FlinkKafkaConsumerBase

《Flink - FlinkKafkaConsumer08》

/**
 * Base class of all Flink Kafka Consumer data sources.
 * This implements the common behavior across all Kafka versions.
 * 
 * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the
 * {@link AbstractFetcher}.
 * 
 * @param <T> The type of records produced by this data source
 */
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements 
        CheckpointListener,
        CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>,
        ResultTypeQueryable<T>

《Flink - FlinkKafkaConsumer08》

这个是对所有版本kafka的抽象,

《Flink - FlinkKafkaConsumer08》

@Override
    public void run(SourceContext<T> sourceContext) throws Exception {
        
        // figure out which partitions this subtask should process
        final List<KafkaTopicPartition> thisSubtaskPartitions = assignPartitions(allSubscribedPartitions, //对应于topic partition数和consumer数,一个consumer应该分配哪些partitions,这里逻辑就是简单的取模
                getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask());
        
        // we need only do work, if we actually have partitions assigned
        if (!thisSubtaskPartitions.isEmpty()) {

            // (1) create the fetcher that will communicate with the Kafka brokers
            final AbstractFetcher<T, ?> fetcher = createFetcher(  //创建Fetcher
                    sourceContext, thisSubtaskPartitions, 
                    periodicWatermarkAssigner, punctuatedWatermarkAssigner,
                    (StreamingRuntimeContext) getRuntimeContext());

            // (2) set the fetcher to the restored checkpoint offsets
            if (restoreToOffset != null) {  //这个如果从checkpoint中读出offset状态
                fetcher.restoreOffsets(restoreToOffset); //恢复offset
            }

            // publish the reference, for snapshot-, commit-, and cancel calls
            // IMPORTANT: We can only do that now, because only now will calls to
            //            the fetchers 'snapshotCurrentState()' method return at least
            //            the restored offsets
            this.kafkaFetcher = fetcher;
            if (!running) {
                return;
            }
            
            // (3) run the fetcher' main work method
            fetcher.runFetchLoop();  //开始run fetcher
        }
    }

    @Override
    public HashMap<KafkaTopicPartition, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        
        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;

        HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); //snapshot当前的offset

        // the map cannot be asynchronously updated, because only one checkpoint call can happen
        // on this function at a time: either snapshotState() or notifyCheckpointComplete()
        pendingCheckpoints.put(checkpointId, currentOffsets); //cache当前的checkpointid,等待该checkpoint完成
        
        // truncate the map, to prevent infinite growth
        while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) { //删除过期的,或老的checkpoints
            pendingCheckpoints.remove(0);
        }

        return currentOffsets;
    }

    @Override
    public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
        LOG.info("Setting restore state in the FlinkKafkaConsumer");
        restoreToOffset = restoredOffsets;
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception { //当这个checkpoint完成时,需要通知kafkaconsumer,这个时候才会真正的commit offset

        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;

        try {
            final int posInMap = pendingCheckpoints.indexOf(checkpointId);

            @SuppressWarnings("unchecked")
            HashMap<KafkaTopicPartition, Long> checkpointOffsets = 
                    (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);

            // remove older checkpoints in map
            for (int i = 0; i < posInMap; i++) { //比该checkpoint老的未完成的checkpoint已经没有意义,删除
                pendingCheckpoints.remove(0);
            }

            fetcher.commitSpecificOffsetsToKafka(checkpointOffsets); //真正的commit offset,这个是通用接口,虽然对于kafka0.8,Fetcher里面本身也是会定期提交的,checkpoint一般秒级别比定期提交更频繁些
        }
    }
    
    /**
     * Selects which of the given partitions should be handled by a specific consumer,
     * given a certain number of consumers.
     * 
     * @param allPartitions The partitions to select from
     * @param numConsumers The number of consumers
     * @param consumerIndex The index of the specific consumer
     * 
     * @return The sublist of partitions to be handled by that consumer.
     */
    protected static List<KafkaTopicPartition> assignPartitions(
            List<KafkaTopicPartition> allPartitions,
            int numConsumers, int consumerIndex)
    {
        final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
                allPartitions.size() / numConsumers + 1);

        for (int i = 0; i < allPartitions.size(); i++) {
            if (i % numConsumers == consumerIndex) {
                thisSubtaskPartitions.add(allPartitions.get(i));
            }
        }
        
        return thisSubtaskPartitions;
    }

《Flink - FlinkKafkaConsumer08》

 

针对kafka0.8的consumer

《Flink - FlinkKafkaConsumer08》

public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {

    /**
     * Creates a new Kafka streaming source consumer for Kafka 0.8.x
     *
     * This constructor allows passing multiple topics and a key/value deserialization schema.
     * 
     * @param topics
     *           The Kafka topics to read from.
     * @param deserializer
     *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
     * @param props
     *           The properties that are used to configure both the fetcher and the offset handler.
     */
    public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
        super(deserializer);

        // validate the zookeeper properties
        validateZooKeeperConfig(props);

        this.invalidOffsetBehavior = getInvalidOffsetBehavior(props); //当offset非法的时候,选择从哪里重置,这里支持earlist或latest
        this.autoCommitInterval = PropertiesUtil.getLong(props, "auto.commit.interval.ms", 60000); //offset commit的间隔,默认是1分钟

        // Connect to a broker to get the partitions for all topics
        List<KafkaTopicPartition> partitionInfos = 
                KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(topics, props)); //这里只是取出topic相关的partition的信息

        setSubscribedPartitions(partitionInfos); //将这部分,即该consumer消费的partitions,加入到SubscribedPartitions,表明这些已经有consumer消费了
    }
    
    /**
     * Send request to Kafka to get partitions for topic.
     * 
     * @param topics The name of the topics.
     * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic. 
     */
    public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) { //这里的逻辑是如果从kafka取得topic的partititon信息,这里配置配的是broker list,而非zk,所以他每次会随机从所有的brokers中挑一个去读取partitions信息
        String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
        final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);

        checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
        String[] seedBrokers = seedBrokersConfString.split(",");
        List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();

        final String clientId = "flink-kafka-consumer-partition-lookup";
        final int soTimeout = getInt(properties, "socket.timeout.ms", 30000);
        final int bufferSize = getInt(properties, "socket.receive.buffer.bytes", 65536);

        Random rnd = new Random();
        retryLoop: for (int retry = 0; retry < numRetries; retry++) {
            // we pick a seed broker randomly to avoid overloading the first broker with all the requests when the
            // parallel source instances start. Still, we try all available brokers.
            int index = rnd.nextInt(seedBrokers.length);
            brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) {
                String seedBroker = seedBrokers[index];
                LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries);
                if (++index == seedBrokers.length) {
                    index = 0;
                }

                URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker);
                SimpleConsumer consumer = null;
                try {
                    consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId);

                    TopicMetadataRequest req = new TopicMetadataRequest(topics);
                    kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                    List<TopicMetadata> metaData = resp.topicsMetadata();

                    // clear in case we have an incomplete list from previous tries
                    partitions.clear();
                    for (TopicMetadata item : metaData) {
                        if (item.errorCode() != ErrorMapping.NoError()) {
                            // warn and try more brokers
                            LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions " +
                                    "for " + topics.toString() + ". Error: " + ErrorMapping.exceptionFor(item.errorCode()).getMessage());
                            continue brokersLoop;
                        }
                        if (!topics.contains(item.topic())) {
                            LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
                            continue brokersLoop;
                        }
                        for (PartitionMetadata part : item.partitionsMetadata()) {
                            Node leader = brokerToNode(part.leader());
                            KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId());
                            KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
                            partitions.add(pInfo);
                        }
                    }
                    break retryLoop; // leave the loop through the brokers
                } catch (Exception e) {
                    LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + "." +
                            "" + e.getClass() + ". Message: " + e.getMessage());
                    LOG.debug("Detailed trace", e);
                    // we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e1) {
                        // sleep shorter.
                    }
                } finally {
                    if (consumer != null) {
                        consumer.close();
                    }
                }
            } // brokers loop
        } // retries loop
        return partitions;
    }

    private static long getInvalidOffsetBehavior(Properties config) {
        final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
        if (val.equals("none")) {
            throw new IllegalArgumentException("Cannot use '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
                    + "' value 'none'. Possible values: 'latest', 'largest', or 'earliest'.");
        }
        else if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9
            return OffsetRequest.LatestTime();
        } else {
            return OffsetRequest.EarliestTime();
        }
    }

《Flink - FlinkKafkaConsumer08》

点赞