kafka 有两种序列化方式,以及一个序列化接口。
// 序列化方式
org.apache.kafka.common.serialization.ByteArraySerializer
org.apache.kafka.common.serialization.ByteArrayDeserializer
org.apache.kafka.common.serialization.StringDeserializer
org.apache.kafka.common.serialization.StringSerializer
//序列化接口
org.apache.kafka.common.serialization.Serializer<T>
org.apache.kafka.common.serialization.Deserializer<T>
因此,如果使用原生的序列化方式,就需要把传输内容拼接成字符串,或者转成字符数组的方式。好在kafka提供了序列化和反序列化的接口。可以自定义对象的序列化方式,达到传输对象的目的。
首先看看kafka 关于producer 和consumer的配置方式。
//producer 配置
public KafkaProducerHelper(String kafkaHosts) {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaHosts); // localhost:9092
props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
props.setProperty(ProducerConfig.RETRIES_CONFIG, "1");
props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "120000");
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "8388608");
//(key,value)中关于value的序列化方式,这里将通过StringSerializer来序列化
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//(key,value)中关于key的序列化方式,这里将通过KafkaTestSeralizer来序列化
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"com.util.KafkaTestSeralizer");
kafkaProducer = new KafkaProducer<String, Object>(props);
}
//Consumer 配置
private static KafkaConsumer<String, Object> kc= null;
public KafkaConsumer<String, Object> getConsumer(String kafkaHost, String groupId) {
if(kc == null) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
//这个groupId会被用来作为offset等内容的记录标志
props.put("group.id", groupId);
//autoCommit可以自动记录offset,如果为false,每次启动都将从offset=0开始消费
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("receive.buffer.bytes", 10485760);
props.put("max.partition.fetch.bytes", 8*1024*1024);
//key的反序列化方式
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//value的反序列化方式
props.put("value.deserializer", "com.util.KafkaTestDeseralizer");
kc = new KafkaConsumer<String, Object>(props);
}
return kc;
}
上面的配置指定了key和value的序列化和反序列化方式。以及一些其他的配置。
这一步非常重要,特别是指定K/V的序列化方式。后面所有的工作都是为了去实现这个序列化和反序列化的功能。
本文采用java.io原生的序列化方式,所以需要对待传输的类添加可序列化接口。
接下来,把需要传输的类实现可序列化接口。
import java.io.Serializable;
public class KafkaTest implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String name;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
然后添加一种序列化和反序列化工具。
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
public class SerializeUtil {
public static byte[] serialize(Object object) {
ObjectOutputStream oos = null;
ByteArrayOutputStream baos = null;
try {
// 序列化
baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos);
oos.writeObject(object);
byte[] bytes = baos.toByteArray();
return bytes;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
@SuppressWarnings("unchecked")
public static <T> Object deserialize(byte[] bytes,Class<T> className) {
ByteArrayInputStream bais = null;
T tmpObject = null;
try {
// 反序列化
bais = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bais);
tmpObject = (T)ois.readObject();
return tmpObject;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
然后,实现consumer和producer配置里的序列化方式。
//KafkaTestSeralizer
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import com.chinaventure.webspider.model.KafkaTest;
public class KafkaTestSeralizer implements Serializer<KafkaTest>{
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub
}
@Override
public byte[] serialize(String topic, KafkaTest data) {
return SerializeUtil.serialize(data);
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
//KafkaTestDeseralizer
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import com.chinaventure.webspider.model.KafkaTest;
public class KafkaTestDeseralizer implements Deserializer<KafkaTest>{
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// TODO Auto-generated method stub
}
@Override
public KafkaTest deserialize(String topic, byte[] bytes) {
return (KafkaTest)SerializeUtil.deserialize(bytes,KafkaTest.class);
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
最后,可以试着使用了
//producer
public static void main(String[] args) throws InterruptedException, ExecutionException {
String topic = "test";
KafkaTest tmp = new KafkaTest();
tmp.setId("123");
tmp.setName("王者荣耀");
int count = 1;
while (true) {
KafkaProducerHelper.getInstance("192.168.0.25:9092").send(topic, "123456", tmp);
Thread.sleep(1000);
System.out.println("producer queue: " + count++);
}
}
//consumer
public static void main(String[] args) throws IOException, InterruptedException {
String topic = "test";
KafkaConsumer<String, Object> consumer = KafkaConsumerHelper.getInstance().getConsumer("192.168.0.25:9092", topic);
consumer.subscribe(Arrays.asList(topic));
int messagecounter = 0;
while(true) {
ConsumerRecords<String, Object> records = consumer.poll(100);
for(ConsumerRecord<String, Object> record : records) {
System.out.println("receive object, key:"+record.key());
KafkaTest tmp = (KafkaTest) record.value();
System.out.println("the game name: "+ tmp.getName());
messagecounter++;
}
System.out.println("received messages:"+messagecounter);
Thread.sleep(3000);
}
}
这就是kafka序列化传输对象的全过程,简单说就是把待传输对象用kafka提供的序列化接口封装一下。具体的序列化方式可以使用很多方式,比如一些第三方的工具都可以。然后把封装好的类配置到producer和consumer里面。这样就能实现对象的传输。