python&Kafka

1.kafka简介

Apache Kafka 是一个分布式的流平台。分布式流平台具有三项关键功能:

  • 类似于消息队列的发布订阅能力
  • 以容错持久的方式存储数据流
  • 即时处理流中的记录

一般将kafka作为流处理系统数据流接收器和缓冲器,保证整个流处理的系统的稳定运行。业务处理部分一般使用storm或spark streaming完成。

本文主要讨论,python做为生产者如何将数据发布到kafka集群中、python作为消费者如何订阅kafka集群中的数据。kafka运行的原理和流处理平台搭建使用不在此进行讨论。

2.Kafka安装部署

2.1 下载Kafka

可以由此链接下载Kafka最新的版本:http://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz

2.2 安装Kafka

本文中kafka是安装到VirtualBox虚拟机中的,操作系统的版本为 CentOS 7(使用 cat /etc/os-release查看版本)

我安装在了opt目录

2.2.1 解压

tar -xf kafka_2.12-1.1.0.tar

2.2.2启动zookeeper

kafka强依赖于zookeeper,启动kafka前必须先启动zookeeper。(同样强依赖zookeeper的还有hbase,storm)

/opt/kafka_2.12-1.1.0/bin/zookeeper-server-start.sh /opt/kafka_2.12-1.1.0/config/zookeeper.properties

2.2.3 启动kafka

/opt/kafka_2.12-1.1.0/bin/kafka-server-start.sh /opt/kafka_2.12-1.1.0/config/server.properties

这里出现了一个异常,解决这个异常详见: https://stackoverflow.com/questions/24061672/verifyerror-uninitialized-object-exists-on-backward-branch-jvm-spec-4-10-2-4。我的jdk版本是“Java(TM) SE Runtime Environment (build 1.8.0-b132)”,升级到jdk1.8.0_161后kafka正常启动

[2018-04-05 05:18:53,804] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)

java.lang.VerifyError: Uninitialized object exists on backward branch 209

2.2.4创建一个topic “test”

/opt/kafka_2.12-1.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2.2.4 查看topic

/opt/kafka_2.12-1.1.0/bin/kafka-topics.sh --list --zookeeper localhost:2181

3.python访问Kafka

在这里使用了一个第三方包“kafka-python”,要求kafka版本大于0.9.

3.1 安装kafka-python

pip install kafka-python

3.2生产者

from kafka import KafkaProducer
from kafka.errors import KafkaError
#创建了一个生产者的对象
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# 异步发送一个条数据,topic和value,没有key
future = producer.send('test', b'123456')

# 获取发送记录的metadata
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    log.exception()
    pass

# 发送记录的topic,分区,偏移量
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

# produce keyed messages to enable hashed partitioning
producer.send('test', key=b'foo', value=b'bar')

# 发送具有key和value的数据,使用msgpack序列化
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('test', {'key': 'value'})

# 生产json数据
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('test', {'key': 'value'})

# produce 异步发送100条数据
for _ in range(100):
    producer.send('test', b'msg')
#定义一个发送成功的回调函数
def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)
#定义一个发送失败的回调函数
def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)
    # handle exception

# produce asynchronously with callbacks
producer.send('test', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)

# 把异步发送的数据全部发送出去
producer.flush()

# 定义重试的次数
producer = KafkaProducer(retries=5)

3.2.1 KafkaProducer的构造参数:

  1. bootstrap_servers :kafka节点或节点的列表,不一定需要罗列所有的kafka节点。格式为: ‘host[:port]’ 。默认值是:localhost:9092
  2. client_id (str) : 客户端id,默认值: ‘kafka-python-{version}’
  3. key_serializer (callable) :key序列化函数
  4. value_serializer (callable) :value序列化函数
  5. acks (0**, 1**, ‘all’) – 0不等待kafka服务端确认,1,等待leader确认,all等所有副本确认
  6. compression_type (str) :数据压缩类型,‘gzip’, ‘snappy’, ‘lz4’, 或者 None.
  7. retries (int) :设置重试次数
  8. batch_size (int):
  9. linger_ms (int)
  10. partitioner (callable) –
  11. buffer_memory (int)
  12. max_block_ms (int)
  13. max_request_size (int)
  14. metadata_max_age_ms (int)
  15. retry_backoff_ms (int)
  16. request_timeout_ms (int)
  17. receive_buffer_bytes (int)
  18. send_buffer_bytes (int)
  19. socket_options (list)
  20. reconnect_backoff_ms (int) –
  21. reconnect_backoff_max_ms (int)
  22. max_in_flight_requests_per_connection (int)
  23. security_protocol (str)
  24. ssl_context (ssl.SSLContext)
  25. ssl_check_hostname (bool)
  26. ssl_cafile (str) –
  27. ssl_certfile (str)
  28. ssl_keyfile (str)
  29. ssl_password (str)
  30. ssl_crlfile (str)
  31. api_version (tuple)
  32. api_version_auto_timeout_ms (int)
  33. metric_reporters (list)
  34. metrics_num_samples (int)
  35. metrics_sample_window_ms (int)
  36. selector (selectors.BaseSelector)
  37. sasl_mechanism (str)
  38. sasl_plain_username (str)
  39. sasl_plain_password (str)
  40. sasl_kerberos_service_name (str)

3.3.2 KafkaConsumer的函数

    1. close(timeout=None): 关闭生产者
    2. flush(timeout=None):强制发送异步数据
    3. metrics(raw=False):性能指标
    4. partitions_for(topic):返回topic所有的分区
    5. send(topic, value=None, key=None, partition=None, timestamp_ms=None):发送数据

3.3 消费者

KafkaConsumer是非线程安全的

from kafka import KafkaConsumer

#创建一个消费者,指定了topic,group_id,bootstrap_servers
#group_id:多个拥有相同group_id的消费者被判定为一组,一条数据记录只会被同一个组中的一个消费者消费
#bootstrap_servers:kafka的节点,多个节点使用逗号分隔
#这种方式只会获取新产生的数据
consumer = KafkaConsumer('test',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
#注意当没有数据可以消费时,循环会阻塞
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

3.3.1 KafkaConsumer的构造参数:

  1. *topics ,要订阅的主题
  2. bootstrap_servers :kafka节点或节点的列表,不一定需要罗列所有的kafka节点。格式为: ‘host[:port]’ 。默认值是:localhost:9092
  3. client_id (str) : 客户端id,默认值: ‘kafka-python-{version}’
  4. group_id (str or None):分组id
  5. key_deserializer (callable) :key反序列化函数
  6. value_deserializer (callable):value反序列化函数
  7. fetch_min_bytes:服务器应每次返回的最小数据量
  8. fetch_max_wait_ms (int): 服务器应每次返回的最大等待时间
  9. fetch_max_bytes (int) :服务器应每次返回的最大数据量
  10. max_partition_fetch_bytes (int) :
  11. request_timeout_ms (int)
  12. retry_backoff_ms (int)
  13. reconnect_backoff_ms (int)
  14. reconnect_backoff_max_ms (int)
  15. max_in_flight_requests_per_connection (int)
  16. auto_offset_reset (str)
  17. enable_auto_commit (bool)
  18. auto_commit_interval_ms (int)
  19. default_offset_commit_callback (callable)
  20. check_crcs (bool)
  21. metadata_max_age_ms (int)
  22. partition_assignment_strategy (list)
  23. max_poll_records (int)
  24. max_poll_interval_ms (int)
  25. session_timeout_ms (int)
  26. heartbeat_interval_ms (int)
  27. receive_buffer_bytes (int)
  28. send_buffer_bytes (int)
  29. socket_options (list)
  30. consumer_timeout_ms (int)
  31. skip_double_compressed_messages (bool)
  32. security_protocol (str)
  33. ssl_context (ssl.SSLContext)
  34. ssl_check_hostname (bool)
  35. ssl_cafile (str) –
  36. ssl_certfile (str)
  37. ssl_keyfile (str)
  38. ssl_password (str)
  39. ssl_crlfile (str)
  40. api_version (tuple)

3.3.2 KafkaConsumer的函数

  1. assign(partitions):手动为该消费者分配一个topic分区列表。
  2. assignment():获取当前分配给该消费者的topic分区。
  3. beginning_offsets(partitions):获取给定分区的第一个偏移量。
  4. close(autocommit=True):关闭消费者
  5. commit(offsets=None):提交偏移量,直到成功或错误为止。
  6. commit_async(offsets=None, callback=None):异步提交偏移量。
  7. committed(partition):获取给定分区的最后一个提交的偏移量。
  8. end_offsets(partitions):获取分区的最大偏移量
  9. highwater(partition):分区最大的偏移量
  10. metrics(raw=False):返回消费者性能指标
  11. next():返回下一条数据
  12. offsets_for_times(timestamps):根据时间戳获取分区偏移量
  13. partitions_for_topic(topic):返回topic的partition列表,返回一个set集合
  14. pause(*partitions):停止获取数据
  15. paused():返回停止获取的分区
  16. poll(timeout_ms=0, max_records=None):获取数据
  17. position(partition):获取分区的偏移量
  18. resume(*partitions):恢复抓取指定的分区
  19. seek(partition, offset):seek偏移量
  20. seek_to_beginning(*partitions):搜索最旧的偏移量
  21. seek_to_end(*partitions):搜索最近可用的偏移量
  22. subscribe(topics=(), pattern=None, listener=None):订阅topics
  23. subscription():返回当前消费者消费的所有topic
  24. topics():返回当前消费者消费的所有topic,返回的是unicode
  25. unsubscribe():取消订阅所有的topic
    原文作者:老冯
    原文地址: https://zhuanlan.zhihu.com/p/35338462
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞