kafka爬坑3:自己写一个生产者

运行kafka生产者客户端,接收用户终端输入。

./rdkafka_simple_producer 0 test

《kafka爬坑3:自己写一个生产者》
《kafka爬坑3:自己写一个生产者》

官方例子核心C代码解读:

  1. 参数校验,需要传入<broker> <topic>
  2. 创建kafka客户端配置
  3. 配置kafka各项参数
  4. 设置发送回调函数
  5. 创建生产者实例
  6. 实例化主题对象
  7. 关闭信号
  8. 打印最开始的提示信息
  9. 异步调用将消息发送到指定的主题对象
  10. 等待消息完成
  11. 销毁主题对象
  12. 销毁生产者实例
#include <stdio.h> #include <signal.h> #include <string.h> 
#include "rdkafka.h" 
static int run = 1;
static void stop (int sig) {
    run = 0;
    fclose(stdin);
}



static void dr_msg_cb (rd_kafka_t *rk,const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) // 打印失败信息     fprintf(stderr, "%% Message delivery failed: %s\n",rd_kafka_err2str(rkmessage->err));
else  // 打印成功信息     fprintf(stderr,"%% Message delivered (%zd bytes, partition %d)\n",rkmessage->len, rkmessage->partition);
}



int main (int argc, char **argv) 
{
    rd_kafka_t *rk; //生产者实例句柄     rd_kafka_topic_t *rkt; //主题对象     rd_kafka_conf_t *conf; //临时配置对象     char errstr[512];  //错误报告缓冲区     char buf[512]; //消息缓冲区     const char *brokers; //参数broker     const char *topic; //参数topic 
    if (argc != 3) {
        fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);
        return 1;
    }

    brokers = argv[1];
    topic   = argv[2];

     //创建kafka客户端配置     conf = rd_kafka_conf_new();

     //配置kafka各项参数     if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        return 1;
    }

    //设置发送回调函数     rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

    //创建生产者实例     rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!rk) {
        fprintf(stderr,"%% Failed to create new producer: %s\n", errstr);
        return 1;
    }

    //实例化主题对象     rkt = rd_kafka_topic_new(rk, topic, NULL);
    if (!rkt) {
        fprintf(stderr, "%% Failed to create topic object: %s\n",rd_kafka_err2str(rd_kafka_last_error()));
        rd_kafka_destroy(rk);
        return 1;
    }

    signal(SIGINT, stop);
    //打印提示信息     fprintf(stderr,
            "%% Type some text and hit enter to produce message\n"
            "%% Or just hit enter to only serve delivery reports\n"
            "%% Press Ctrl-C or Ctrl-D to exit\n");

    while (run && fgets(buf, sizeof(buf), stdin)) 
    {
        size_t len = strlen(buf);

        if (buf[len-1] == '\n') /* Remove newline */
                buf[--len] = '\0';

        if (len == 0) 
        {
            rd_kafka_poll(rk, 0/*non-blocking */);
            continue;
        }

retry://异步调用将消息发送到指定的主题对象         if (rd_kafka_produce(rkt,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_COPY, buf, len, NULL, 0, NULL) == -1) 
        {
            fprintf(stderr,"%% Failed to produce to topic %s: %s\n",rd_kafka_topic_name(rkt),rd_kafka_err2str(rd_kafka_last_error()));

            if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) 
            { // 阻塞等待消息发送完成                 rd_kafka_poll(rk, 1000/*block for max 1000ms*/);
                goto retry;
            }
        } 
        else // 打印成功信息         {
            fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n",len, rd_kafka_topic_name(rkt));
        }
        rd_kafka_poll(rk, 0/*non-blocking*/);
    }

    fprintf(stderr, "%% Flushing final messages..\n");
    rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */);
    rd_kafka_topic_destroy(rkt);//销毁主题对象     rd_kafka_destroy(rk);//销毁生产者实例 
    return 0;
}

官方例子原始C代码:

编译:

gcc -g -O2 -fPIC -Wall -Wsign-compare -Wfloat-equal -Wpointer-arith -Wcast-align  -I../src rdkafka_simple_producer.c -o rdkafka_simple_producer  ../src/librdkafka.a -lm -lz   -ldl -lpthread -lrt

头文件5653行,文章中放不下,这里放一个原版.c文件。

/*  * librdkafka - Apache Kafka C library  *  * Copyright (c) 2017, Magnus Edenhill  * All rights reserved.  *  * Redistribution and use in source and binary forms, with or without  * modification, are permitted provided that the following conditions are met:  *  * 1. Redistributions of source code must retain the above copyright notice,  * this list of conditions and the following disclaimer.  * 2. Redistributions in binary form must reproduce the above copyright notice,  * this list of conditions and the following disclaimer in the documentation  * and/or other materials provided with the distribution.  *  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE  * POSSIBILITY OF SUCH DAMAGE.  */

/**  * Simple Apache Kafka producer  * using the Kafka driver from librdkafka  * (https://github.com/edenhill/librdkafka)  */

#include <stdio.h> #include <signal.h> #include <string.h> 

/* Typical include path would be <librdkafka/rdkafka.h>, but this program  * is builtin from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" 

static int run = 1;

/**  * @brief Signal termination of program  */
static void stop (int sig) {
        run = 0;
        fclose(stdin); /* abort fgets() */
}


/**  * @brief Message delivery report callback.  *  * This callback is called exactly once per message, indicating if  * the message was succesfully delivered  * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently  * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR).  *  * The callback is triggered from rd_kafka_poll() and executes on  * the application's thread.  */
static void dr_msg_cb (rd_kafka_t *rk,
                       const rd_kafka_message_t *rkmessage, void *opaque) {
        if (rkmessage->err)
                fprintf(stderr, "%% Message delivery failed: %s\n",
                        rd_kafka_err2str(rkmessage->err));
        else
                fprintf(stderr,
                        "%% Message delivered (%zd bytes, "
                        "partition %"PRId32")\n",
                        rkmessage->len, rkmessage->partition);

        /* The rkmessage is destroyed automatically by librdkafka */
}



int main (int argc, char **argv) {
        rd_kafka_t *rk;         /* Producer instance handle */
        rd_kafka_topic_t *rkt;  /* Topic object */
        rd_kafka_conf_t *conf;  /* Temporary configuration object */
        char errstr[512];       /* librdkafka API error reporting buffer */
        char buf[512];          /* Message value temporary buffer */
        const char *brokers;    /* Argument: broker list */
        const char *topic;      /* Argument: topic to produce to */

        /*  * Argument validation  */
        if (argc != 3) {
                fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);
                return 1;
        }

        brokers = argv[1];
        topic   = argv[2];


        /*  * Create Kafka client configuration place-holder  */
        conf = rd_kafka_conf_new();

        /* Set bootstrap broker(s) as a comma-separated list of  * host or host:port (default port 9092).  * librdkafka will use the bootstrap brokers to acquire the full  * set of brokers from the cluster. */
        if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
                              errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
                fprintf(stderr, "%s\n", errstr);
                return 1;
        }

        /* Set the delivery report callback.  * This callback will be called once per message to inform  * the application if delivery succeeded or failed.  * See dr_msg_cb() above. */
        rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);


        /*  * Create producer instance.  *  * NOTE: rd_kafka_new() takes ownership of the conf object  * and the application must not reference it again after  * this call.  */
        rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
        if (!rk) {
                fprintf(stderr,
                        "%% Failed to create new producer: %s\n", errstr);
                return 1;
        }


        /* Create topic object that will be reused for each message  * produced.  *  * Both the producer instance (rd_kafka_t) and topic objects (topic_t)  * are long-lived objects that should be reused as much as possible.  */
        rkt = rd_kafka_topic_new(rk, topic, NULL);
        if (!rkt) {
                fprintf(stderr, "%% Failed to create topic object: %s\n",
                        rd_kafka_err2str(rd_kafka_last_error()));
                rd_kafka_destroy(rk);
                return 1;
        }

        /* Signal handler for clean shutdown */
        signal(SIGINT, stop);

        fprintf(stderr,
                "%% Type some text and hit enter to produce message\n"
                "%% Or just hit enter to only serve delivery reports\n"
                "%% Press Ctrl-C or Ctrl-D to exit\n");

        while (run && fgets(buf, sizeof(buf), stdin)) {
                size_t len = strlen(buf);

                if (buf[len-1] == '\n') /* Remove newline */
                        buf[--len] = '\0';

                if (len == 0) {
                        /* Empty line: only serve delivery reports */
                        rd_kafka_poll(rk, 0/*non-blocking */);
                        continue;
                }

                /*  * Send/Produce message.  * This is an asynchronous call, on success it will only  * enqueue the message on the internal producer queue.  * The actual delivery attempts to the broker are handled  * by background threads.  * The previously registered delivery report callback  * (dr_msg_cb) is used to signal back to the application  * when the message has been delivered (or failed).  */
        retry:
                if (rd_kafka_produce(
                            /* Topic object */
                            rkt,
                            /* Use builtin partitioner to select partition*/
                            RD_KAFKA_PARTITION_UA,
                            /* Make a copy of the payload. */
                            RD_KAFKA_MSG_F_COPY,
                            /* Message payload (value) and length */
                            buf, len,
                            /* Optional key and its length */
                            NULL, 0,
                            /* Message opaque, provided in  * delivery report callback as  * msg_opaque. */
                            NULL) == -1) {
                        /**  * Failed to *enqueue* message for producing.  */
                        fprintf(stderr,
                                "%% Failed to produce to topic %s: %s\n",
                                rd_kafka_topic_name(rkt),
                                rd_kafka_err2str(rd_kafka_last_error()));

                        /* Poll to handle delivery reports */
                        if (rd_kafka_last_error() ==
                            RD_KAFKA_RESP_ERR__QUEUE_FULL) {
                                /* If the internal queue is full, wait for  * messages to be delivered and then retry.  * The internal queue represents both  * messages to be sent and messages that have  * been sent or failed, awaiting their  * delivery report callback to be called.  *  * The internal queue is limited by the  * configuration property  * queue.buffering.max.messages */
                                rd_kafka_poll(rk, 1000/*block for max 1000ms*/);
                                goto retry;
                        }
                } else {
                        fprintf(stderr, "%% Enqueued message (%zd bytes) "
                                "for topic %s\n",
                                len, rd_kafka_topic_name(rkt));
                }


                /* A producer application should continually serve  * the delivery report queue by calling rd_kafka_poll()  * at frequent intervals.  * Either put the poll call in your main loop, or in a  * dedicated thread, or call it after every  * rd_kafka_produce() call.  * Just make sure that rd_kafka_poll() is still called  * during periods where you are not producing any messages  * to make sure previously produced messages have their  * delivery report callback served (and any other callbacks  * you register). */
                rd_kafka_poll(rk, 0/*non-blocking*/);
        }


        /* Wait for final messages to be delivered or fail.  * rd_kafka_flush() is an abstraction over rd_kafka_poll() which  * waits for all messages to be delivered. */
        fprintf(stderr, "%% Flushing final messages..\n");
        rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */);

        /* Destroy topic object */
        rd_kafka_topic_destroy(rkt);

        /* Destroy the producer instance */
        rd_kafka_destroy(rk);

        return 0;
}

    原文作者:ailx10
    原文地址: https://zhuanlan.zhihu.com/p/61953828
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞