Kafka 与 Java 的集成最容易实现了。
按照文档上面的例子,稍微改一改就能用。
kafka 是 LinkedIn 的开源产品
源代码托管在 https://github.com/apache/kafka 上面
因此需要下载源代码,才编译成想要的jar,供自己编写的 java 程序调用
当然也可以使用安装包里面的 kafka lib,里面包含了所有的 kafka api.
只要在新建的 java project 中引用安装包里面的 kafka library即可。
找到 examples 文件夹,修改 Demo 文件 /KafkaDemo/AppDemo/KafkaConsumerProducerDemo.java:
因为我们不需要模拟 consumer 的接收情况,所以注释掉 consumer 部分.
public class KafkaConsumerProducerDemo implements KafkaProperties
{
public static void main(String[] args)
{
Producer producerThread = new Producer(KafkaProperties.topic);
producerThread.start();
// Consumer consumerThread = new Consumer(KafkaProperties.topic);
// consumerThread.start();
//
}
}
找到 producer.java , 修改其代码,让它每1秒钟发送一次消息:
public void run() {
int messageNo = 1;
while (true) {
String messageStr = new String(“message from Java App: Message_” + messageNo);
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
messageNo++;
try {
this.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
还需要修改 properties.java 文件,使其适应我们本机的 zookeeper, kafka cluster 配置
public interface KafkaProperties
{
final static String zkConnect = “127.0.0.1:2181”;
final static String groupId = “group1”;
final static String topic = “ConsoleRealTimeMessage”;
final static String kafkaServerURL = “localhost”;
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64*1024;
final static int connectionTimeOut = 100000;
final static int reconnectInterval = 10000;
final static String topic2 = “topic2”;
final static String topic3 = “topic3”;
final static String clientId = “SimpleConsumerDemoClient”;
}
打开一个 terminal 窗口,接收消息:
>kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic ConsoleRealTimeMessage –from-beginning