- 上一节我们讲了librdkakfa对topic-partition的封装, 任何一个partition都必须要属于一下topic;
- 我们这节就来分析一上librdkafka对topic的封装
rd_kafka_itopic_s
- 所在文件: src/rdkafka_topic.h
- 这里还有一个类型
rd_kafka_topic_t
,定义:typedef struct rd_kafka_topic_s rd_kafka_topic_t;
,这是个空定义没有现实, 其实就是rd_kafka_itopic_s
, 这个类型主要是面向librdkafka的使用者,sdk里称作app topic
, 它有自己的引用计数. 在librdkafka内部使用rd_kafka_itopic
, 它也有自己的引用计数, 有点罗嗦啊~ - 定义:
struct rd_kafka_itopic_s {
TAILQ_ENTRY(rd_kafka_itopic_s) rkt_link;
rd_refcnt_t rkt_refcnt;
rwlock_t rkt_lock;
rd_kafkap_str_t *rkt_topic;
shptr_rd_kafka_toppar_t *rkt_ua;
shptr_rd_kafka_toppar_t **rkt_p;
int32_t rkt_partition_cnt;
rd_list_t rkt_desp;
rd_ts_t rkt_ts_metadata;
mtx_t rkt_app_lock;
,
rd_kafka_topic_t *rkt_app_rkt;
int rkt_app_refcnt;
enum {
RD_KAFKA_TOPIC_S_UNKNOWN,
RD_KAFKA_TOPIC_S_EXISTS,
RD_KAFKA_TOPIC_S_NOTEXISTS,
} rkt_state;
int rkt_flags;
#define RD_KAFKA_TOPIC_F_LEADER_UNAVAIL 0x1
rd_kafka_t *rkt_rk;
shptr_rd_kafka_itopic_t *rkt_shptr_app;
rd_kafka_topic_conf_t rkt_conf;
};
- 创建一个
rd_kafka_itopic_s
对象rd_kafka_topic_new0
, 这是一个内部调用函数
shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk,
const char *topic,
rd_kafka_topic_conf_t *conf,
int *existing,
int do_lock) {
rd_kafka_itopic_t *rkt;
shptr_rd_kafka_itopic_t *s_rkt;
const struct rd_kafka_metadata_cache_entry *rkmce;
if (!topic || strlen(topic) > 512) {
if (conf)
rd_kafka_topic_conf_destroy(conf);
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
EINVAL);
return NULL;
}
if (do_lock)
rd_kafka_wrlock(rk);
if ((s_rkt = rd_kafka_topic_find(rk, topic, 0))) {
if (do_lock)
rd_kafka_wrunlock(rk);
if (conf)
rd_kafka_topic_conf_destroy(conf);
if (existing)
*existing = 1;
return s_rkt;
}
if (existing)
*existing = 0;
rkt = rd_calloc(1, sizeof(*rkt));
rkt->rkt_topic = rd_kafkap_str_new(topic, -1);
rkt->rkt_rk = rk;
if (!conf) {
if (rk->rk_conf.topic_conf)
conf = rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf);
else
conf = rd_kafka_topic_conf_new();
}
rkt->rkt_conf = *conf;
rd_free(conf);
if (!rkt->rkt_conf.partitioner)
rkt->rkt_conf.partitioner = rd_kafka_msg_partitioner_consistent_random;
if (rkt->rkt_conf.compression_codec == RD_KAFKA_COMPRESSION_INHERIT)
rkt->rkt_conf.compression_codec = rk->rk_conf.compression_codec;
rd_list_init(&rkt->rkt_desp, 16, NULL);
rd_refcnt_init(&rkt->rkt_refcnt, 0);
s_rkt = rd_kafka_topic_keep(rkt);
rwlock_init(&rkt->rkt_lock);
mtx_init(&rkt->rkt_app_lock, mtx_plain);
rkt->rkt_ua = rd_kafka_toppar_new(rkt, RD_KAFKA_PARTITION_UA);
TAILQ_INSERT_TAIL(&rk->rk_topics, rkt, rkt_link);
rk->rk_topic_cnt++;
if ((rkmce = rd_kafka_metadata_cache_find(rk, topic, 1))) {
if (existing)
*existing = 1;
rd_kafka_topic_metadata_update(rkt, &rkmce->rkmce_mtopic,
rkmce->rkmce_ts_insert);
}
if (do_lock)
rd_kafka_wrunlock(rk);
return s_rkt;
- 创建
rd_kafka_topic_t
对象, 对外的接口rd_kafka_topic_new
rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic,
rd_kafka_topic_conf_t *conf) {
shptr_rd_kafka_itopic_t *s_rkt;
rd_kafka_itopic_t *rkt;
rd_kafka_topic_t *app_rkt;
int existing;
s_rkt = rd_kafka_topic_new0(rk, topic, conf, &existing, 1);
if (!s_rkt)
return NULL;
rkt = rd_kafka_topic_s2i(s_rkt);
app_rkt = rd_kafka_topic_keep_app(rkt);
if (!existing)
rd_kafka_topic_leader_query(rk, rkt);
rd_kafka_topic_destroy0(s_rkt);
return app_rkt;
}
- 获取当前
rd_kafka_t
对象持有的所有topic的名字,保存在一个rd_list
中
void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics) {
rd_kafka_itopic_t *rkt;
rd_kafka_rdlock(rk);
rd_list_grow(topics, rk->rk_topic_cnt);
TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link)
rd_list_add(topics, rd_strdup(rkt->rkt_topic->str));
rd_kafka_rdunlock(rk);
}
- 判断parition是否是有效的,就是判断其leader是否有效
int rd_kafka_topic_partition_available (const rd_kafka_topic_t *app_rkt,
int32_t partition) {
int avail;
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_toppar_t *rktp;
rd_kafka_broker_t *rkb;
s_rktp = rd_kafka_toppar_get(rd_kafka_topic_a2i(app_rkt),
partition, 0);
if (unlikely(!s_rktp))
return 0;
rktp = rd_kafka_toppar_s2i(s_rktp);
rkb = rd_kafka_toppar_leader(rktp, 1);
avail = rkb ? 1 : 0;
if (rkb)
rd_kafka_broker_destroy(rkb);
rd_kafka_toppar_destroy(s_rktp);
return avail;
}
- 扫描所有topic的patitions:
- 筛出 kafka message过期的, 回调application层
- 找出需要刷新metadata的, 发送metadata request
int rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now) {
rd_kafka_itopic_t *rkt;
rd_kafka_toppar_t *rktp;
shptr_rd_kafka_toppar_t *s_rktp;
int totcnt = 0;
rd_list_t query_topics;
rd_list_init(&query_topics, 0, rd_free);
rd_kafka_rdlock(rk);
TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
int p;
int cnt = 0, tpcnt = 0;
rd_kafka_msgq_t timedout;
int query_this = 0;
rd_kafka_msgq_init(&timedout);
rd_kafka_topic_wrlock(rkt);
if (rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN &&
!rd_kafka_metadata_cache_topic_get(
rk, rkt->rkt_topic->str, 1)) {
rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_UNKNOWN);
query_this = 1;
}
rd_kafka_topic_wrunlock(rkt);
rd_kafka_topic_rdlock(rkt);
if (rkt->rkt_partition_cnt == 0) {
query_this = 1;
}
for (p = RD_KAFKA_PARTITION_UA ;
p < rkt->rkt_partition_cnt ; p++) {
int did_tmout = 0;
if (!(s_rktp = rd_kafka_toppar_get(rkt, p, 0)))
continue;
rktp = rd_kafka_toppar_s2i(s_rktp);
rd_kafka_toppar_lock(rktp);
if (p != RD_KAFKA_PARTITION_UA &&
(!rktp->rktp_leader ||
rktp->rktp_leader->rkb_source ==
RD_KAFKA_INTERNAL ||
rd_kafka_broker_get_state(rktp->rktp_leader) <
RD_KAFKA_BROKER_STATE_UP)) {
query_this = 1;
}
if (rd_kafka_msgq_age_scan(&rktp->rktp_xmit_msgq,
&timedout, now) > 0)
did_tmout = 1;
if (rd_kafka_msgq_age_scan(&rktp->rktp_msgq,
&timedout, now) > 0)
did_tmout = 1;
tpcnt += did_tmout;
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_destroy(s_rktp);
}
rd_kafka_topic_rdunlock(rkt);
if ((cnt = rd_atomic32_get(&timedout.rkmq_msg_cnt)) > 0) {
totcnt += cnt;
rd_kafka_dr_msgq(rkt, &timedout,
RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
}
if (query_this &&
!rd_list_find(&query_topics, rkt->rkt_topic->str,
(void *)strcmp))
rd_list_add(&query_topics,
rd_strdup(rkt->rkt_topic->str));
}
rd_kafka_rdunlock(rk);
if (!rd_list_empty(&query_topics))
rd_kafka_metadata_refresh_topics(rk, NULL, &query_topics,
1,
"refresh unavailable topics");
rd_list_destroy(&query_topics);
return totcnt;
}
- 更新topic的partition个数, partition个数可能增加, 也可能减少
rd_kafka_topic_partition_cnt_update
, 简单讲: - 新增的partition, 创建;
- 老的partition, 删除;
static int rd_kafka_topic_partition_cnt_update (rd_kafka_itopic_t *rkt,
int32_t partition_cnt) {
rd_kafka_t *rk = rkt->rkt_rk;
shptr_rd_kafka_toppar_t **rktps;
shptr_rd_kafka_toppar_t *rktp_ua;
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_toppar_t *rktp;
rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq);
int32_t i;
更新前后partition数量相同的话, 不作任何处理
if (likely(rkt->rkt_partition_cnt == partition_cnt))
return 0;
if (partition_cnt > 0)
rktps = rd_calloc(partition_cnt, sizeof(*rktps));
else
rktps = NULL;
for (i = 0 ; i < partition_cnt ; i++) {
if (i >= rkt->rkt_partition_cnt) {
s_rktp = rd_kafka_toppar_desired_get(rkt, i);
rktp = s_rktp ? rd_kafka_toppar_s2i(s_rktp) : NULL;
if (rktp) {
rd_kafka_toppar_lock(rktp);
rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_UNKNOWN;
rd_kafka_toppar_desired_unlink(rktp);
rd_kafka_toppar_unlock(rktp);
} else
s_rktp = rd_kafka_toppar_new(rkt, i);
rktps[i] = s_rktp;
} else {
rktps[i] = rd_kafka_toppar_keep(
rd_kafka_toppar_s2i(rkt->rkt_p[i]));
rd_kafka_toppar_destroy(rkt->rkt_p[i]);
}
}
rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) {
rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp),
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
}
for (i = partition_cnt ; i < rkt->rkt_partition_cnt ; i++) {
s_rktp = rkt->rkt_p[i];
rktp = rd_kafka_toppar_s2i(s_rktp);
rd_kafka_toppar_lock(rktp);
if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED) {
rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED",
"Topic %s [%"PRId32"] is desired "
"but no longer known: "
"moving back on desired list",
rkt->rkt_topic->str, rktp->rktp_partition);
rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
rd_kafka_toppar_desired_link(rktp);
if (!rd_kafka_terminating(rkt->rkt_rk))
rd_kafka_toppar_enq_error(
rktp,
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
rd_kafka_toppar_broker_delegate(rktp, NULL, 0);
} else {
rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_REMOVE;
rd_kafka_toppar_broker_leave_for_remove(rktp);
}
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_destroy(s_rktp);
}
if (rkt->rkt_p)
rd_free(rkt->rkt_p);
rkt->rkt_p = rktps;
rkt->rkt_partition_cnt = partition_cnt;
return 1;
}
- 将在UA partition上待发送的kafka message重新分配到有效的patition上
rd_kafka_topic_assign_uas
:
static void rd_kafka_topic_assign_uas (rd_kafka_itopic_t *rkt,
rd_kafka_resp_err_t err) {
rd_kafka_t *rk = rkt->rkt_rk;
shptr_rd_kafka_toppar_t *s_rktp_ua;
rd_kafka_toppar_t *rktp_ua;
rd_kafka_msg_t *rkm, *tmp;
rd_kafka_msgq_t uas = RD_KAFKA_MSGQ_INITIALIZER(uas);
rd_kafka_msgq_t failed = RD_KAFKA_MSGQ_INITIALIZER(failed);
int cnt;
if (rkt->rkt_rk->rk_type != RD_KAFKA_PRODUCER)
return;
s_rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
if (unlikely(!s_rktp_ua)) {
return;
}
rktp_ua = rd_kafka_toppar_s2i(s_rktp_ua);
rd_kafka_toppar_lock(rktp_ua);
rd_kafka_msgq_move(&uas, &rktp_ua->rktp_msgq);
cnt = rd_atomic32_get(&uas.rkmq_msg_cnt);
rd_kafka_toppar_unlock(rktp_ua);
TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) {
if (rkm->rkm_partition != RD_KAFKA_PARTITION_UA &&
rkm->rkm_partition >= rkt->rkt_partition_cnt &&
rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN) {
rd_kafka_msgq_enq(&failed, rkm);
continue;
}
if (unlikely(rd_kafka_msg_partitioner(rkt, rkm, 0) != 0)) {
rd_kafka_msgq_enq(&failed, rkm);
}
}
if (rd_atomic32_get(&failed.rkmq_msg_cnt) > 0) {
rd_kafka_dr_msgq(rkt, &failed,
rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS ?
err :
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
}
rd_kafka_toppar_destroy(s_rktp_ua);
}
* 关于metadata相关的操作, 我们介绍metadata时再来分析