运行kafka生产者客户端,接收用户终端输入。
./rdkafka_simple_producer 0 test
官方例子核心C代码解读:
- 参数校验,需要传入<broker> <topic>
- 创建kafka客户端配置
- 配置kafka各项参数
- 设置发送回调函数
- 创建生产者实例
- 实例化主题对象
- 关闭信号
- 打印最开始的提示信息
- 异步调用将消息发送到指定的主题对象
- 等待消息完成
- 销毁主题对象
- 销毁生产者实例
#include <stdio.h> #include <signal.h> #include <string.h>
#include "rdkafka.h"
static int run = 1;
static void stop (int sig) {
run = 0;
fclose(stdin);
}
static void dr_msg_cb (rd_kafka_t *rk,const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) // 打印失败信息 fprintf(stderr, "%% Message delivery failed: %s\n",rd_kafka_err2str(rkmessage->err));
else // 打印成功信息 fprintf(stderr,"%% Message delivered (%zd bytes, partition %d)\n",rkmessage->len, rkmessage->partition);
}
int main (int argc, char **argv)
{
rd_kafka_t *rk; //生产者实例句柄 rd_kafka_topic_t *rkt; //主题对象 rd_kafka_conf_t *conf; //临时配置对象 char errstr[512]; //错误报告缓冲区 char buf[512]; //消息缓冲区 const char *brokers; //参数broker const char *topic; //参数topic
if (argc != 3) {
fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);
return 1;
}
brokers = argv[1];
topic = argv[2];
//创建kafka客户端配置 conf = rd_kafka_conf_new();
//配置kafka各项参数 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
return 1;
}
//设置发送回调函数 rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
//创建生产者实例 rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr,"%% Failed to create new producer: %s\n", errstr);
return 1;
}
//实例化主题对象 rkt = rd_kafka_topic_new(rk, topic, NULL);
if (!rkt) {
fprintf(stderr, "%% Failed to create topic object: %s\n",rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(rk);
return 1;
}
signal(SIGINT, stop);
//打印提示信息 fprintf(stderr,
"%% Type some text and hit enter to produce message\n"
"%% Or just hit enter to only serve delivery reports\n"
"%% Press Ctrl-C or Ctrl-D to exit\n");
while (run && fgets(buf, sizeof(buf), stdin))
{
size_t len = strlen(buf);
if (buf[len-1] == '\n') /* Remove newline */
buf[--len] = '\0';
if (len == 0)
{
rd_kafka_poll(rk, 0/*non-blocking */);
continue;
}
retry://异步调用将消息发送到指定的主题对象 if (rd_kafka_produce(rkt,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_COPY, buf, len, NULL, 0, NULL) == -1)
{
fprintf(stderr,"%% Failed to produce to topic %s: %s\n",rd_kafka_topic_name(rkt),rd_kafka_err2str(rd_kafka_last_error()));
if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL)
{ // 阻塞等待消息发送完成 rd_kafka_poll(rk, 1000/*block for max 1000ms*/);
goto retry;
}
}
else // 打印成功信息 {
fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n",len, rd_kafka_topic_name(rkt));
}
rd_kafka_poll(rk, 0/*non-blocking*/);
}
fprintf(stderr, "%% Flushing final messages..\n");
rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */);
rd_kafka_topic_destroy(rkt);//销毁主题对象 rd_kafka_destroy(rk);//销毁生产者实例
return 0;
}
官方例子原始C代码:
编译:
gcc -g -O2 -fPIC -Wall -Wsign-compare -Wfloat-equal -Wpointer-arith -Wcast-align -I../src rdkafka_simple_producer.c -o rdkafka_simple_producer ../src/librdkafka.a -lm -lz -ldl -lpthread -lrt
头文件5653行,文章中放不下,这里放一个原版.c
文件。
/* * librdkafka - Apache Kafka C library * * Copyright (c) 2017, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */
/** * Simple Apache Kafka producer * using the Kafka driver from librdkafka * (https://github.com/edenhill/librdkafka) */
#include <stdio.h> #include <signal.h> #include <string.h>
/* Typical include path would be <librdkafka/rdkafka.h>, but this program * is builtin from within the librdkafka source tree and thus differs. */
#include "rdkafka.h"
static int run = 1;
/** * @brief Signal termination of program */
static void stop (int sig) {
run = 0;
fclose(stdin); /* abort fgets() */
}
/** * @brief Message delivery report callback. * * This callback is called exactly once per message, indicating if * the message was succesfully delivered * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR). * * The callback is triggered from rd_kafka_poll() and executes on * the application's thread. */
static void dr_msg_cb (rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err)
fprintf(stderr, "%% Message delivery failed: %s\n",
rd_kafka_err2str(rkmessage->err));
else
fprintf(stderr,
"%% Message delivered (%zd bytes, "
"partition %"PRId32")\n",
rkmessage->len, rkmessage->partition);
/* The rkmessage is destroyed automatically by librdkafka */
}
int main (int argc, char **argv) {
rd_kafka_t *rk; /* Producer instance handle */
rd_kafka_topic_t *rkt; /* Topic object */
rd_kafka_conf_t *conf; /* Temporary configuration object */
char errstr[512]; /* librdkafka API error reporting buffer */
char buf[512]; /* Message value temporary buffer */
const char *brokers; /* Argument: broker list */
const char *topic; /* Argument: topic to produce to */
/* * Argument validation */
if (argc != 3) {
fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);
return 1;
}
brokers = argv[1];
topic = argv[2];
/* * Create Kafka client configuration place-holder */
conf = rd_kafka_conf_new();
/* Set bootstrap broker(s) as a comma-separated list of * host or host:port (default port 9092). * librdkafka will use the bootstrap brokers to acquire the full * set of brokers from the cluster. */
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
return 1;
}
/* Set the delivery report callback. * This callback will be called once per message to inform * the application if delivery succeeded or failed. * See dr_msg_cb() above. */
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
/* * Create producer instance. * * NOTE: rd_kafka_new() takes ownership of the conf object * and the application must not reference it again after * this call. */
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr,
"%% Failed to create new producer: %s\n", errstr);
return 1;
}
/* Create topic object that will be reused for each message * produced. * * Both the producer instance (rd_kafka_t) and topic objects (topic_t) * are long-lived objects that should be reused as much as possible. */
rkt = rd_kafka_topic_new(rk, topic, NULL);
if (!rkt) {
fprintf(stderr, "%% Failed to create topic object: %s\n",
rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(rk);
return 1;
}
/* Signal handler for clean shutdown */
signal(SIGINT, stop);
fprintf(stderr,
"%% Type some text and hit enter to produce message\n"
"%% Or just hit enter to only serve delivery reports\n"
"%% Press Ctrl-C or Ctrl-D to exit\n");
while (run && fgets(buf, sizeof(buf), stdin)) {
size_t len = strlen(buf);
if (buf[len-1] == '\n') /* Remove newline */
buf[--len] = '\0';
if (len == 0) {
/* Empty line: only serve delivery reports */
rd_kafka_poll(rk, 0/*non-blocking */);
continue;
}
/* * Send/Produce message. * This is an asynchronous call, on success it will only * enqueue the message on the internal producer queue. * The actual delivery attempts to the broker are handled * by background threads. * The previously registered delivery report callback * (dr_msg_cb) is used to signal back to the application * when the message has been delivered (or failed). */
retry:
if (rd_kafka_produce(
/* Topic object */
rkt,
/* Use builtin partitioner to select partition*/
RD_KAFKA_PARTITION_UA,
/* Make a copy of the payload. */
RD_KAFKA_MSG_F_COPY,
/* Message payload (value) and length */
buf, len,
/* Optional key and its length */
NULL, 0,
/* Message opaque, provided in * delivery report callback as * msg_opaque. */
NULL) == -1) {
/** * Failed to *enqueue* message for producing. */
fprintf(stderr,
"%% Failed to produce to topic %s: %s\n",
rd_kafka_topic_name(rkt),
rd_kafka_err2str(rd_kafka_last_error()));
/* Poll to handle delivery reports */
if (rd_kafka_last_error() ==
RD_KAFKA_RESP_ERR__QUEUE_FULL) {
/* If the internal queue is full, wait for * messages to be delivered and then retry. * The internal queue represents both * messages to be sent and messages that have * been sent or failed, awaiting their * delivery report callback to be called. * * The internal queue is limited by the * configuration property * queue.buffering.max.messages */
rd_kafka_poll(rk, 1000/*block for max 1000ms*/);
goto retry;
}
} else {
fprintf(stderr, "%% Enqueued message (%zd bytes) "
"for topic %s\n",
len, rd_kafka_topic_name(rkt));
}
/* A producer application should continually serve * the delivery report queue by calling rd_kafka_poll() * at frequent intervals. * Either put the poll call in your main loop, or in a * dedicated thread, or call it after every * rd_kafka_produce() call. * Just make sure that rd_kafka_poll() is still called * during periods where you are not producing any messages * to make sure previously produced messages have their * delivery report callback served (and any other callbacks * you register). */
rd_kafka_poll(rk, 0/*non-blocking*/);
}
/* Wait for final messages to be delivered or fail. * rd_kafka_flush() is an abstraction over rd_kafka_poll() which * waits for all messages to be delivered. */
fprintf(stderr, "%% Flushing final messages..\n");
rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */);
/* Destroy topic object */
rd_kafka_topic_destroy(rkt);
/* Destroy the producer instance */
rd_kafka_destroy(rk);
return 0;
}