在你的 Java 程序里调用 Kafka 发起数据流

Kafka 与 Java 的集成最容易实现了。

按照文档上面的例子,稍微改一改就能用。

kafka 是 LinkedIn 的开源产品

源代码托管在 https://github.com/apache/kafka 上面

因此需要下载源代码,才编译成想要的jar,供自己编写的 java 程序调用

当然也可以使用安装包里面的 kafka lib,里面包含了所有的 kafka api.

只要在新建的 java project 中引用安装包里面的 kafka library即可。

找到 examples 文件夹,修改 Demo 文件 /KafkaDemo/AppDemo/KafkaConsumerProducerDemo.java:

因为我们不需要模拟 consumer 的接收情况,所以注释掉 consumer 部分.

public class KafkaConsumerProducerDemo implements KafkaProperties

{

public static void main(String[] args)

{

Producer producerThread = new Producer(KafkaProperties.topic);

producerThread.start();

// Consumer consumerThread = new Consumer(KafkaProperties.topic);

// consumerThread.start();

//

}

}

找到 producer.java , 修改其代码,让它每1秒钟发送一次消息:

public void run() {

int messageNo = 1;

while (true) {

String messageStr = new String(“message from Java App: Message_” + messageNo);

producer.send(new KeyedMessage<Integer, String>(topic, messageStr));

messageNo++;

try {

this.sleep(1000);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

还需要修改 properties.java 文件,使其适应我们本机的 zookeeper, kafka cluster 配置

public interface KafkaProperties

{

final static String zkConnect = “127.0.0.1:2181”;

final static String groupId = “group1”;

final static String topic = “ConsoleRealTimeMessage”;

final static String kafkaServerURL = “localhost”;

final static int kafkaServerPort = 9092;

final static int kafkaProducerBufferSize = 64*1024;

final static int connectionTimeOut = 100000;

final static int reconnectInterval = 10000;

final static String topic2 = “topic2”;

final static String topic3 = “topic3”;

final static String clientId = “SimpleConsumerDemoClient”;

}

打开一个 terminal 窗口,接收消息:

>kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic ConsoleRealTimeMessage –from-beginning

    原文作者:黄赟
    原文地址: https://zhuanlan.zhihu.com/p/33973127
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞