Librdkafka对Kafka Message的封装和相关操作

  • struct rd_kafka_message_t
  • struct rd_kafka_msg_t
  • struct rd_kafka_msgq_t
  • kafka message的协议格式可参考 官网

struct rd_kafka_message_s

  • 所在文件: src/rdkafka.h
  • 生产的数据在application层调用接口后最终会将数据封装成这个结构, 从broker消费下来的数据回调给application层时也会封装成这个结构;
  • 定义:
typedef struct rd_kafka_message_s {
    rd_kafka_resp_err_t err;   /**< Non-zero for error signaling. */
    rd_kafka_topic_t *rkt;     /**< Topic */
    int32_t partition;         /**< Partition */
    void   *payload;           /**< Producer: original message payload.
                    * Consumer: Depends on the value of \c err :
                    * - \c err==0: Message payload.
                    * - \c err!=0: Error string */
    size_t  len;               /**< Depends on the value of \c err :
                    * - \c err==0: Message payload length
                    * - \c err!=0: Error string length */
    void   *key;               /**< Depends on the value of \c err :
                    * - \c err==0: Optional message key */
    size_t  key_len;           /**< Depends on the value of \c err :
                    * - \c err==0: Optional message key length*/
    int64_t offset;            /**< Consume:
                                    * - Message offset (or offset for error
                    *   if \c err!=0 if applicable).
                                    * - dr_msg_cb:
                                    *   Message offset assigned by broker.
                                    *   If \c produce.offset.report is set then
                                    *   each message will have this field set,
                                    *   otherwise only the last message in
                                    *   each produced internal batch will
                                    *   have this field set, otherwise 0. */
    void  *_private;           /**< Consume:
                    *  - rdkafka private pointer: DO NOT MODIFY
                    *  - dr_msg_cb:
                                    *    msg_opaque from produce() call */
} rd_kafka_message_t;

struct rd_kafka_msg_t

  • 所在文件: src/rdkafka_msg
  • 封装了上面的 struct rd_kafka_message_s
  • 定义:
typedef struct rd_kafka_msg_s {
    rd_kafka_message_t rkm_rkmessage;  /* MUST be first field */

        // 使其成为tailq的元素
    TAILQ_ENTRY(rd_kafka_msg_s)  rkm_link;

    int        rkm_flags;

        // 时间戳, 分两类: 客户端生间时的时间和broker接收后作append log时的时间
    int64_t    rkm_timestamp;  
    rd_kafka_timestamp_type_t rkm_tstype; /* rkm_timestamp type */

        union {
                struct {
                        rd_ts_t ts_timeout; /* Message timeout */
                        rd_ts_t ts_enq;     /* Enqueue/Produce time */
                } producer;
        } rkm_u;
} rd_kafka_msg_t;
  • rd_kafka_message_t类型转化为rd_kafka_msg_t: rd_kafka_message_t rkm_rkmessage必须是struct rd_kafka_msg_s结构的第一个字段,
rd_kafka_msg_t *rd_kafka_message2msg (rd_kafka_message_t *rkmessage) {
    return (rd_kafka_msg_t *)rkmessage;
}
  • 发送kafka message前的审计 librdkafka支持异步发送, 本地有发送缓冲区, 因为在发送前需要作check, 看发送队列是否已满, 如果设置了block发送, 在发送队列满的情况在要一直阻塞wait, 直到被signal
static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
rd_kafka_curr_msgs_add (rd_kafka_t *rk, unsigned int cnt, size_t size,
            int block) {

    if (rk->rk_type != RD_KAFKA_PRODUCER)
        return RD_KAFKA_RESP_ERR_NO_ERROR;

    mtx_lock(&rk->rk_curr_msgs.lock);
    while (unlikely(rk->rk_curr_msgs.cnt + cnt >
            rk->rk_curr_msgs.max_cnt ||
            (unsigned long long)(rk->rk_curr_msgs.size + size) >
            (unsigned long long)rk->rk_curr_msgs.max_size)) {
        if (!block) {
            mtx_unlock(&rk->rk_curr_msgs.lock);
            return RD_KAFKA_RESP_ERR__QUEUE_FULL;
        }

        cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock);
    }

    rk->rk_curr_msgs.cnt  += cnt;
    rk->rk_curr_msgs.size += size;
    mtx_unlock(&rk->rk_curr_msgs.lock);

    return RD_KAFKA_RESP_ERR_NO_ERROR;
}
  • 创建rd_kafka_msg_t, 内部接口 rd_kafka_msg_t *rd_kafka_msg_new00
rd_kafka_msg_t *rd_kafka_msg_new00 (rd_kafka_itopic_t *rkt,
                    int32_t partition,
                    int msgflags,
                    char *payload, size_t len,
                    const void *key, size_t keylen,
                    void *msg_opaque) {
    rd_kafka_msg_t *rkm;
    size_t mlen = sizeof(*rkm);
    char *p;

    /* If we are to make a copy of the payload, allocate space for it too */
        // 如果设置了RD_KAFKA_MSG_F_COPY,  需要为payload分配内存,在rd_kafka_msg_t后面
    if (msgflags & RD_KAFKA_MSG_F_COPY) {
        msgflags &= ~RD_KAFKA_MSG_F_FREE;
        mlen += len;
    }

    mlen += keylen;

    /* Note: using rd_malloc here, not rd_calloc, so make sure all fields
     *       are properly set up. */
    rkm                 = rd_malloc(mlen);
    rkm->rkm_err        = 0;
    rkm->rkm_flags      = RD_KAFKA_MSG_F_FREE_RKM | msgflags;
    rkm->rkm_len        = len;
    rkm->rkm_opaque     = msg_opaque;
    rkm->rkm_rkmessage.rkt = rd_kafka_topic_keep_a(rkt);

    rkm->rkm_partition  = partition;
        rkm->rkm_offset     = RD_KAFKA_OFFSET_INVALID;
    rkm->rkm_timestamp  = 0;
    rkm->rkm_tstype     = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;

    p = (char *)(rkm+1);

        // 复制payload
    if (payload && msgflags & RD_KAFKA_MSG_F_COPY) {
        /* Copy payload to space following the ..msg_t */
        rkm->rkm_payload = p;
        memcpy(rkm->rkm_payload, payload, len);
        p += len;

    } else {
        /* Just point to the provided payload. */
        rkm->rkm_payload = payload;
    }

    if (key) {
        rkm->rkm_key     = p;
        rkm->rkm_key_len = keylen;
        memcpy(rkm->rkm_key, key, keylen);
    } else {
        rkm->rkm_key = NULL;
        rkm->rkm_key_len = 0;
    }

        return rkm;
}
  • 创建rd_kafka_msg_t, 创建之前增加check, 内部接口 rd_kafka_msg_t *rd_kafka_msg_new0
static rd_kafka_msg_t *rd_kafka_msg_new0 (rd_kafka_itopic_t *rkt,
                                          int32_t force_partition,
                                          int msgflags,
                                          char *payload, size_t len,
                                          const void *key, size_t keylen,
                                          void *msg_opaque,
                                          rd_kafka_resp_err_t *errp,
                                          int *errnop,
                                          int64_t timestamp,
                                          rd_ts_t now) {
    rd_kafka_msg_t *rkm;

    if (unlikely(!payload))
        len = 0;
    if (!key)
        keylen = 0;

        // 检查msg大小是否超出了配置的最大msg大小
    if (unlikely(len + keylen >
             (size_t)rkt->rkt_rk->rk_conf.max_msg_size ||
             keylen > INT32_MAX)) {
        *errp = RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
        if (errnop)
            *errnop = EMSGSIZE;
        return NULL;
    }

        // 检查发送队列是否已满, 如果设置了block发送, 在发送队列满的情况在要一直阻塞wait, 直到被signal
    *errp = rd_kafka_curr_msgs_add(rkt->rkt_rk, 1, len,
                       msgflags & RD_KAFKA_MSG_F_BLOCK);
    if (unlikely(*errp)) {
        if (errnop)
            *errnop = ENOBUFS;
        return NULL;
    }

        // 创建 rd_kakfa_msg_t
    rkm = rd_kafka_msg_new00(rkt, force_partition,
                 msgflags|RD_KAFKA_MSG_F_ACCOUNT /* curr_msgs_add() */,
                 payload, len, key, keylen, msg_opaque);

        if (timestamp)
                rkm->rkm_timestamp  = timestamp;
        else
                rkm->rkm_timestamp = rd_uclock()/1000;
        rkm->rkm_tstype     = RD_KAFKA_TIMESTAMP_CREATE_TIME;

        rkm->rkm_ts_enq = now;

    if (rkt->rkt_conf.message_timeout_ms == 0) {
        rkm->rkm_ts_timeout = INT64_MAX;
    } else {
        rkm->rkm_ts_timeout = now +
            rkt->rkt_conf.message_timeout_ms * 1000;
    }

        /* Call interceptor chain for on_send */
        // on_send拦截器, 对这个rkm->rkm_rkmessage作一些个性化处理
        rd_kafka_interceptors_on_send(rkt->rkt_rk, &rkm->rkm_rkmessage);

        return rkm;
}
  • 创建rd_kafka_msg_t, 并放入选定的topic-partition的队列
int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
              int msgflags,
              char *payload, size_t len,
              const void *key, size_t keylen,
              void *msg_opaque) {
    rd_kafka_msg_t *rkm;
    rd_kafka_resp_err_t err;
    int errnox;

        /* Create message */
        // 创建 rd_kafka_msg_t
        rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags, 
                                payload, len, key, keylen, msg_opaque,
                                &err, &errnox, 0, rd_clock());
        if (unlikely(!rkm)) {
                /* errno is already set by msg_new() */
        rd_kafka_set_last_error(err, errnox);
                return -1;
        }


        /* Partition the message */
       // 选定topic-parition, 放入队列, 这个函数很重要, 我们会单独讲
    err = rd_kafka_msg_partitioner(rkt, rkm, 1);
    if (likely(!err)) {
        rd_kafka_set_last_error(0, 0);
        return 0;
    }

       // 失败的话, 作清理, 设置error
        /* Interceptor: unroll failing messages by triggering on_ack.. */
        rkm->rkm_err = err;
        rd_kafka_interceptors_on_acknowledgement(rkt->rkt_rk,
                                                 &rkm->rkm_rkmessage);

    /* Handle partitioner failures: it only fails when the application
     * attempts to force a destination partition that does not exist
     * in the cluster.  Note we must clear the RD_KAFKA_MSG_F_FREE
     * flag since our contract says we don't free the payload on
     * failure. */

    rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
    rd_kafka_msg_destroy(rkt->rkt_rk, rkm);

    /* Translate error codes to errnos. */
    if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
        rd_kafka_set_last_error(err, ESRCH);
    else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
        rd_kafka_set_last_error(err, ENOENT);
    else
        rd_kafka_set_last_error(err, EINVAL); /* NOTREACHED */

    return -1;
}

struct rd_kafka_msgq_t

  • 所在文件: src/rdkafka_msg.h
  • 其实就是简单封装的rd_kafka_msg_t队列
  • 定义:
typedef struct rd_kafka_msgq_s {
    TAILQ_HEAD(, rd_kafka_msg_s) rkmq_msgs;

        // kafka message 个数
    rd_atomic32_t rkmq_msg_cnt;

        // kafka message总大小
    rd_atomic64_t rkmq_msg_bytes;
} rd_kafka_msgq_t;
  • 合并两个rd_kafka_msgq_t:
void rd_kafka_msgq_concat (rd_kafka_msgq_t *dst,
                           rd_kafka_msgq_t *src)
  • 使用一个rd_kafka_msgq_t覆盖另一个rd_kafka_msgq_t
void rd_kafka_msgq_move (rd_kafka_msgq_t *dst,
                         rd_kafka_msgq_t *src)
  • 从队列里删除一个rd_kafka_msgq_t
rd_kafka_msg_t *rd_kafka_msgq_deq (rd_kafka_msgq_t *rkmq,
                   rd_kafka_msg_t *rkm,
                   int do_count)
  • 出队列
rd_kafka_msg_t *rd_kafka_msgq_pop (rd_kafka_msgq_t *rkmq) {
    rd_kafka_msg_t *rkm;

    if (((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs))))
        rd_kafka_msgq_deq(rkmq, rkm, 1);

    return rkm;
}
  • 入队列, 插入到队尾
static RD_INLINE RD_UNUSED void rd_kafka_msgq_enq (rd_kafka_msgq_t *rkmq,
                        rd_kafka_msg_t *rkm) {
    TAILQ_INSERT_TAIL(&rkmq->rkmq_msgs, rkm, rkm_link);
    rd_atomic32_add(&rkmq->rkmq_msg_cnt, 1);
    rd_atomic64_add(&rkmq->rkmq_msg_bytes, rkm->rkm_len+rkm->rkm_key_len);
}
  • 扫描队列, 将超时的加入到超时队列
int rd_kafka_msgq_age_scan (rd_kafka_msgq_t *rkmq,
                rd_kafka_msgq_t *timedout,
                rd_ts_t now) {
    rd_kafka_msg_t *rkm, *tmp;
    int cnt = rd_atomic32_get(&timedout->rkmq_msg_cnt);

    /* Assume messages are added in time sequencial order */
    TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) {
        if (likely(rkm->rkm_ts_timeout > now))
            break;

        rd_kafka_msgq_deq(rkmq, rkm, 1);
        rd_kafka_msgq_enq(timedout, rkm);
    }

    return rd_atomic32_get(&timedout->rkmq_msg_cnt) - cnt;
}
  • rd_kafka_msg_partitioner很重要的一个函数, 作两件事: 确定一个topic-partition, 然后把这个rd_kafka_msgq_t放到这个topic-parition对应的队列里
int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
                  int do_lock) {
    int32_t partition;
    rd_kafka_toppar_t *rktp_new;
        shptr_rd_kafka_toppar_t *s_rktp_new;
    rd_kafka_resp_err_t err;

    if (do_lock)
        rd_kafka_topic_rdlock(rkt);

       // 根据这个topic当前的状态, 分别作处理
        switch (rkt->rkt_state)
        {
        case RD_KAFKA_TOPIC_S_UNKNOWN:
                /* No metadata received from cluster yet.
                 * Put message in UA partition and re-run partitioner when
                 * cluster comes up. */
        partition = RD_KAFKA_PARTITION_UA;
                break;

        case RD_KAFKA_TOPIC_S_NOTEXISTS:
                /* Topic not found in cluster.
                 * Fail message immediately. */
                err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
        if (do_lock)
            rd_kafka_topic_rdunlock(rkt);
                return err;

        case RD_KAFKA_TOPIC_S_EXISTS:
                /* Topic exists in cluster. */

                /* Topic exists but has no partitions.
                 * This is usually an transient state following the
                 * auto-creation of a topic. */
                if (unlikely(rkt->rkt_partition_cnt == 0)) {
                        partition = RD_KAFKA_PARTITION_UA;
                        break;
                }

                /* Partition not assigned, run partitioner. */
                // 如果rkm->rkm_partition == RD_KAFKA_PARTITION_UA, 调用设转置的partitioner函数来确定一个partition
                if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) {
                        rd_kafka_topic_t *app_rkt;
                        /* Provide a temporary app_rkt instance to protect
                         * from the case where the application decided to
                         * destroy its topic object prior to delivery completion
                         * (issue #502). */
                        app_rkt = rd_kafka_topic_keep_a(rkt);
                        partition = rkt->rkt_conf.
                                partitioner(app_rkt,
                                            rkm->rkm_key,
                        rkm->rkm_key_len,
                                            rkt->rkt_partition_cnt,
                                            rkt->rkt_conf.opaque,
                                            rkm->rkm_opaque);
                        rd_kafka_topic_destroy0(
                                rd_kafka_topic_a2s(app_rkt));
                } else
                        partition = rkm->rkm_partition;

                /* Check that partition exists. */
                if (partition >= rkt->rkt_partition_cnt) {
                        err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
                        if (do_lock)
                                rd_kafka_topic_rdunlock(rkt);
                        return err;
                }
                break;

        default:
                rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
                break;
        }

    /* Get new partition */
    s_rktp_new = rd_kafka_toppar_get(rkt, partition, 0);

    if (unlikely(!s_rktp_new)) {
        /* Unknown topic or partition */
        if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
            err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
        else
            err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;

        if (do_lock)
            rd_kafka_topic_rdunlock(rkt);

        return  err;
    }

        rktp_new = rd_kafka_toppar_s2i(s_rktp_new);
        rd_atomic64_add(&rktp_new->rktp_c.msgs, 1);

        /* Update message partition */
        if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA)
                rkm->rkm_partition = partition;

    /* Partition is available: enqueue msg on partition's queue */
        // 塞到partition队列的队尾
    rd_kafka_toppar_enq_msg(rktp_new, rkm);
    if (do_lock)
        rd_kafka_topic_rdunlock(rkt);
    rd_kafka_toppar_destroy(s_rktp_new); /* from _get() */
    return 0;
}

Librdkafka源码分析-Content Table

    原文作者:扫帚的影子
    原文地址: https://zhuanlan.zhihu.com/p/54793287
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞