1.kafka简介
Apache Kafka 是一个分布式的流平台。分布式流平台具有三项关键功能:
- 类似于消息队列的发布订阅能力
- 以容错持久的方式存储数据流
- 即时处理流中的记录
一般将kafka作为流处理系统数据流接收器和缓冲器,保证整个流处理的系统的稳定运行。业务处理部分一般使用storm或spark streaming完成。
本文主要讨论,python做为生产者如何将数据发布到kafka集群中、python作为消费者如何订阅kafka集群中的数据。kafka运行的原理和流处理平台搭建使用不在此进行讨论。
2.Kafka安装部署
2.1 下载Kafka
可以由此链接下载Kafka最新的版本:http://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz
2.2 安装Kafka
本文中kafka是安装到VirtualBox虚拟机中的,操作系统的版本为 CentOS 7(使用 cat /etc/os-release
查看版本)
我安装在了opt目录
2.2.1 解压
tar -xf kafka_2.12-1.1.0.tar
2.2.2启动zookeeper
kafka强依赖于zookeeper,启动kafka前必须先启动zookeeper。(同样强依赖zookeeper的还有hbase,storm)
/opt/kafka_2.12-1.1.0/bin/zookeeper-server-start.sh /opt/kafka_2.12-1.1.0/config/zookeeper.properties
2.2.3 启动kafka
/opt/kafka_2.12-1.1.0/bin/kafka-server-start.sh /opt/kafka_2.12-1.1.0/config/server.properties
这里出现了一个异常,解决这个异常详见: https://stackoverflow.com/questions/24061672/verifyerror-uninitialized-object-exists-on-backward-branch-jvm-spec-4-10-2-4。我的jdk版本是“Java(TM) SE Runtime Environment (build 1.8.0-b132)”,升级到jdk1.8.0_161后kafka正常启动
[2018-04-05 05:18:53,804] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.VerifyError: Uninitialized object exists on backward branch 209
2.2.4创建一个topic “test”
/opt/kafka_2.12-1.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2.2.4 查看topic
/opt/kafka_2.12-1.1.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
3.python访问Kafka
在这里使用了一个第三方包“kafka-python”,要求kafka版本大于0.9.
3.1 安装kafka-python
pip install kafka-python
3.2生产者
from kafka import KafkaProducer
from kafka.errors import KafkaError
#创建了一个生产者的对象
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# 异步发送一个条数据,topic和value,没有key
future = producer.send('test', b'123456')
# 获取发送记录的metadata
try:
record_metadata = future.get(timeout=10)
except KafkaError:
log.exception()
pass
# 发送记录的topic,分区,偏移量
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
# produce keyed messages to enable hashed partitioning
producer.send('test', key=b'foo', value=b'bar')
# 发送具有key和value的数据,使用msgpack序列化
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('test', {'key': 'value'})
# 生产json数据
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('test', {'key': 'value'})
# produce 异步发送100条数据
for _ in range(100):
producer.send('test', b'msg')
#定义一个发送成功的回调函数
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
#定义一个发送失败的回调函数
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
# handle exception
# produce asynchronously with callbacks
producer.send('test', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)
# 把异步发送的数据全部发送出去
producer.flush()
# 定义重试的次数
producer = KafkaProducer(retries=5)
3.2.1 KafkaProducer的构造参数:
- bootstrap_servers :kafka节点或节点的列表,不一定需要罗列所有的kafka节点。格式为: ‘host[:port]’ 。默认值是:localhost:9092
- client_id (str) : 客户端id,默认值: ‘kafka-python-{version}’
- key_serializer (callable) :key序列化函数
- value_serializer (callable) :value序列化函数
- acks (0**, 1**, ‘all’) – 0不等待kafka服务端确认,1,等待leader确认,all等所有副本确认
- compression_type (str) :数据压缩类型,‘gzip’, ‘snappy’, ‘lz4’, 或者 None.
- retries (int) :设置重试次数
- batch_size (int):
- linger_ms (int)
- partitioner (callable) –
- buffer_memory (int)
- max_block_ms (int)
- max_request_size (int)
- metadata_max_age_ms (int)
- retry_backoff_ms (int)
- request_timeout_ms (int)
- receive_buffer_bytes (int)
- send_buffer_bytes (int)
- socket_options (list)
- reconnect_backoff_ms (int) –
- reconnect_backoff_max_ms (int)
- max_in_flight_requests_per_connection (int)
- security_protocol (str)
- ssl_context (ssl.SSLContext)
- ssl_check_hostname (bool)
- ssl_cafile (str) –
- ssl_certfile (str)
- ssl_keyfile (str)
- ssl_password (str)
- ssl_crlfile (str)
- api_version (tuple)
- api_version_auto_timeout_ms (int)
- metric_reporters (list)
- metrics_num_samples (int)
- metrics_sample_window_ms (int)
- selector (selectors.BaseSelector)
- sasl_mechanism (str)
- sasl_plain_username (str)
- sasl_plain_password (str)
- sasl_kerberos_service_name (str)
3.3.2 KafkaConsumer的函数
close
(timeout=None): 关闭生产者flush
(timeout=None):强制发送异步数据metrics
(raw=False):性能指标partitions_for
(topic):返回topic所有的分区send
(topic, value=None, key=None, partition=None, timestamp_ms=None):发送数据
3.3 消费者
KafkaConsumer是非线程安全的
from kafka import KafkaConsumer
#创建一个消费者,指定了topic,group_id,bootstrap_servers
#group_id:多个拥有相同group_id的消费者被判定为一组,一条数据记录只会被同一个组中的一个消费者消费
#bootstrap_servers:kafka的节点,多个节点使用逗号分隔
#这种方式只会获取新产生的数据
consumer = KafkaConsumer('test',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
#注意当没有数据可以消费时,循环会阻塞
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
3.3.1 KafkaConsumer的构造参数:
- *topics ,要订阅的主题
- bootstrap_servers :kafka节点或节点的列表,不一定需要罗列所有的kafka节点。格式为: ‘host[:port]’ 。默认值是:localhost:9092
- client_id (str) : 客户端id,默认值: ‘kafka-python-{version}’
- group_id (str or None):分组id
- key_deserializer (callable) :key反序列化函数
- value_deserializer (callable):value反序列化函数
- fetch_min_bytes:服务器应每次返回的最小数据量
- fetch_max_wait_ms (int): 服务器应每次返回的最大等待时间
- fetch_max_bytes (int) :服务器应每次返回的最大数据量
- max_partition_fetch_bytes (int) :
- request_timeout_ms (int)
- retry_backoff_ms (int)
- reconnect_backoff_ms (int)
- reconnect_backoff_max_ms (int)
- max_in_flight_requests_per_connection (int)
- auto_offset_reset (str)
- enable_auto_commit (bool)
- auto_commit_interval_ms (int)
- default_offset_commit_callback (callable)
- check_crcs (bool)
- metadata_max_age_ms (int)
- partition_assignment_strategy (list)
- max_poll_records (int)
- max_poll_interval_ms (int)
- session_timeout_ms (int)
- heartbeat_interval_ms (int)
- receive_buffer_bytes (int)
- send_buffer_bytes (int)
- socket_options (list)
- consumer_timeout_ms (int)
- skip_double_compressed_messages (bool)
- security_protocol (str)
- ssl_context (ssl.SSLContext)
- ssl_check_hostname (bool)
- ssl_cafile (str) –
- ssl_certfile (str)
- ssl_keyfile (str)
- ssl_password (str)
- ssl_crlfile (str)
- api_version (tuple)
3.3.2 KafkaConsumer的函数
assign
(partitions):手动为该消费者分配一个topic分区列表。assignment
():获取当前分配给该消费者的topic分区。beginning_offsets
(partitions):获取给定分区的第一个偏移量。close
(autocommit=True):关闭消费者commit
(offsets=None):提交偏移量,直到成功或错误为止。commit_async
(offsets=None, callback=None):异步提交偏移量。committed
(partition):获取给定分区的最后一个提交的偏移量。end_offsets
(partitions):获取分区的最大偏移量highwater
(partition):分区最大的偏移量metrics
(raw=False):返回消费者性能指标- next():返回下一条数据
offsets_for_times
(timestamps):根据时间戳获取分区偏移量partitions_for_topic
(topic):返回topic的partition列表,返回一个set集合pause
(*partitions):停止获取数据paused
():返回停止获取的分区poll
(timeout_ms=0, max_records=None):获取数据position
(partition):获取分区的偏移量resume
(*partitions):恢复抓取指定的分区seek
(partition, offset):seek偏移量seek_to_beginning
(*partitions):搜索最旧的偏移量seek_to_end
(*partitions):搜索最近可用的偏移量subscribe
(topics=(), pattern=None, listener=None):订阅topicssubscription
():返回当前消费者消费的所有topictopics
():返回当前消费者消费的所有topic,返回的是unicodeunsubscribe
():取消订阅所有的topic