logstash – 无法读取Kafka – Avro Schema消息

任何解决这个问题的方法???我无法阅读KAFKA-AVRO架构消息.我试图将消息从logstash发送到KAFKA到HDFS.

以下是技术堆栈:

> Logstash 2.3 – 当前生产版本
>汇合3.0.
>插件:
一个. Logstash-kafka-Output插件
湾Logstash编解码器,Avro公司.
> zookeeper:3.4.6
> KAFKA:0.10.0.0

Logstash配置文件如下所示:

input {
stdin{}
}

filter {
mutate {
remove_field => ["@timestamp","@version"]
  }
}

output {
  kafka {
topic_id => 'logstash_logs14'

codec => avro  { 
schema_uri => "/opt/logstash/bin/schema.avsc"
    }
  }
}

schema.avsc文件如下所示:

{
    "type":"record",
    "name":"myrecord",
    "fields":[
        {"name":"message","type":"string"},
        {"name":"host","type":"string"}
        ]
}

运行以下命令:

>在自己的终端中启动Zookeeper

./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

2在自己的终端启动Kafka

./bin/kafka-server-start ./etc/kafka/server.properties

3在自己的终端中启动模式注册表

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

4从logstash目录中,运行以下命令

bin/logstash -f ./bin/logstash.conf

5在运行上述命令后,键入要发送到kafka的日志消息
 例如:“你好世界”

6从卡夫卡那里消费这个话题

./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic logstash_logs14 --from-beginning
_While consuming we get the following error:_

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Processed a total of 1 messages
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

请让我知道如何解决这个问题

谢谢,
众议员

最佳答案 你是如何写作/出版给卡夫卡的?您正在看到SerializationException,因为数据不是使用schema-registry(或KafkaAvroSerializer)编写的,但在使用schema-registry时,kafka-avro-console-consumer在内部使用schema-registry(或KafkaAvroDeserializer),它希望数据为以某种格式(特别是< magic byte>< schemaId>< data>).如果您使用kafka-avro-console-producer编写avro数据,那么您不应该获得此异常,或者您可以在生产者属性中为密钥和放大器设置
KafkaAvroSerializer.值序列化器并设置schema-registry-url.

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
点赞