kafka生产者、消费者java示例

1. 生产者

《kafka生产者、消费者java示例》
《kafka生产者、消费者java示例》

import java.util.Properties; 
   
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
   
public class MyProducer {   
     
        public static void main(String[] args) {   
            Properties props = new Properties();   
            props.setProperty("metadata.broker.list","localhost:9092");   
            props.setProperty("serializer.class","kafka.serializer.StringEncoder");   
            props.put("request.required.acks","1");   
            ProducerConfig config = new ProducerConfig(props);   
            //创建生产这对象
            Producer<String, String> producer = new Producer<String, String>(config);
            //生成消息
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("mykafka","test-kafka");
            try {   
                int i =1; 
                while(i < 100){    
                    //发送消息
                    producer.send(data);   
                } 
            } catch (Exception e) {   
                e.printStackTrace();   
            }   
            producer.close();   
        }   
}

View Code

2. 消费者

《kafka生产者、消费者java示例》
《kafka生产者、消费者java示例》

import java.util.HashMap; 
import java.util.List;   
import java.util.Map;   
import java.util.Properties;   
     
import kafka.consumer.ConsumerConfig;   
import kafka.consumer.ConsumerIterator;   
import kafka.consumer.KafkaStream;   
import kafka.javaapi.consumer.ConsumerConnector;  
   
public class MyConsumer extends Thread{ 
        //消费者连接
        private final ConsumerConnector consumer;   
        //要消费的话题
        private final String topic;   
     
        public MyConsumer(String topic) {   
            consumer =kafka.consumer.Consumer   
                    .createJavaConsumerConnector(createConsumerConfig());   
            this.topic =topic;   
        }   
     
    //配置相关信息
    private static ConsumerConfig createConsumerConfig() {   
        Properties props = new Properties();   
//        props.put("zookeeper.connect","localhost:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");
        //配置要连接的zookeeper地址与端口
        //The ‘zookeeper.connect’ string identifies where to find once instance of Zookeeper in your cluster.
        //Kafka uses ZooKeeper to store offsets of messages consumed for a specific topic and partition by this Consumer Group
        props.put("zookeeper.connect","localhost:2181");
        
        //配置zookeeper的组id (The ‘group.id’ string defines the Consumer Group this process is consuming on behalf of.)
        props.put("group.id", "0");
        
        //配置zookeeper连接超时间隔
        //The ‘zookeeper.session.timeout.ms’ is how many milliseconds Kafka will wait for 
        //ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages.
        props.put("zookeeper.session.timeout.ms","10000"); 
 
        //The ‘zookeeper.sync.time.ms’ is the number of milliseconds a ZooKeeper ‘follower’ can be behind the master before an error occurs.
        props.put("zookeeper.sync.time.ms", "200");

        //The ‘auto.commit.interval.ms’ setting is how often updates to the consumed offsets are written to ZooKeeper. 
        //Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages.
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);   
    }   
     
    public void run(){ 
        
        Map<String,Integer> topickMap = new HashMap<String, Integer>();   
        topickMap.put(topic, 1);   
        Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);   
        
        KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);   
        ConsumerIterator<byte[],byte[]> it =stream.iterator();   
        System.out.println("*********Results********");   
        while(true){   
            if(it.hasNext()){ 
                //打印得到的消息   
                System.err.println(Thread.currentThread()+" get data:" +new String(it.next().message()));   
            } 
            try {   
                Thread.sleep(1000);   
            } catch (InterruptedException e) {   
                e.printStackTrace();   
            }   
        }   
    }  
    
    
    public static void main(String[] args) {   
        MyConsumer consumerThread = new MyConsumer("mykafka");   
        consumerThread.start();   
    }   
}

View Code

3. 消费者的线程执行器实现

  首先建立一个处理消息的类Consumer

《kafka生产者、消费者java示例》
《kafka生产者、消费者java示例》

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class Consumer implements Runnable {
    
    private KafkaStream stream;
    private int threadNumber;
 
    public Consumer(KafkaStream a_stream, int a_threadNumber) {
        threadNumber = a_threadNumber;
        stream = a_stream;
    }
 
    public void run() {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext())
            System.out.println("Thread " + threadNumber + ": " + new String(it.next().message()));
        System.out.println("Shutting down Thread: " + threadNumber);
    }
}

View Code

  其次实现多线程的调用

《kafka生产者、消费者java示例》
《kafka生产者、消费者java示例》

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.concurrent.*; 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ConsumerGroup {
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;
 
    public ConsumerGroup(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }
 
    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted during shutdown, exiting uncleanly");
        }
   }
 
    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);
 
        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new Consumer(stream, threadNumber));
            threadNumber++;
        }
    }
 
    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
 
        return new ConsumerConfig(props);
    }
 
    public static void main(String[] args) {
        String zooKeeper = "localhost:2181";
        String groupId = "0";
        String topic = "mykafka";
        int threads = 2;  //启动的线程数
 
        ConsumerGroup group = new ConsumerGroup(zooKeeper, groupId, topic);
        group.run(threads);
 
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {
 
        }
        group.shutdown();
    }
}

View Code

 

    原文作者:~风轻云淡~
    原文地址: https://www.cnblogs.com/gaopeng527/p/4950232.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞