Kafka授权
如果没有开启Kafka认证(如Kerberos认证或者简单的用户名密码),即使开启了Kafka授权,用户也可以伪造身份访问服务。所以建议创建高安全模式(即支持Kerberos)的Kafka集群,详见Kerberos安全文档。
备注:
本文的权限配置只针对E-MapReduce的高安全模式集群
,即Kafka以Kerberos的方式启动。
1. 添加配置
在Kafka集群的配置管理->Kafka->配置->server.properties->自定义配置
添加如下几个参数:
key | value | 备注 |
---|---|---|
authorizer.class.name | kafka.security.auth.SimpleAclAuthorizer | |
super.users | User:kafka | User:kafka是必须的,可添加其它用户用分号(;)隔开 |
备注:
zookeeper.set.acl
用来设置kafka在zookeeper中数据的权限,E-MapReduce集群中已经设置为true,所以上述不需要再添加该配置
。该配置打开后,在Kerberos环境中,只有用户名称为kafka且通过Kerberos认证后才能执行kafka-topics.sh
命令(kafka-topics.sh会直接读写/修改zookeeper中的数据)。
2. 重启Kafka集群
在Kafka集群的配置管理->HBase->操作->RESTART All Components
3. 授权(ACL)
3.1 基本概念
Kafka官方文档定义:
Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H On Resource R"
即ACL过程涉及Principal
Allowed/Denied
Operation
Host
Resource
- Principal:用户名
安全协议 | value | |
---|---|---|
PLAINTEXT | ANONYMOUS | |
SSL | ANONYMOUS | |
SASL_PLAINTEXT | mechanism为PLAIN时,用户名是client_jaas.conf指定的用户名,mechanism为GSSAPI时,用户名为client_jaas.conf指定的principal | |
SASL_SSL |
- Allowed/Denied: 允许/拒绝
- Operation: 操作
Read,Write,Create,DeleteAlter,Describe,ClusterAction,AlterConfigs,DescribeConfigs,IdempotentWrite,All
- Host: 针对的机器
- Resource: 权限作用的资源对象
Topic, Group, Cluster, TransactionalId
Operation/Resource的一些详细对应关系,如哪些Resource支持哪些Operation的授权,详见KIP-11 – Authorization Interface
3.2 授权命令
-使用脚本 kafka-acls.sh (/usr/lib/kafka-current/bin/kafka-acls.sh) 进行授权
具体是使用方式可以直接执行 kafka-acls.sh --help
进行查看。
4. 操作示例
在已经创建的E-MapReduce高安全Kafka集群
的master节点上进行相关示例操作。
4.1 新建用户test
useradd test
4.2 创建topic
第一节添加配置的备注中提到zookeeper.set.acl=true,kafka-topics.sh需要在kafka账号执行
,而且kafka账号下要通过Kerberos认证。
#kafka_client_jaas.conf中已经设置了kafka的Kerberos认证相关信息
export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
# zookeeper地址改成自己集群的对应地址(执行hostnamed的输出)
kafka-topics.sh --create --zookeeper emr-header-1:2181/kafka-1.0.0 --replication-factor 3 --partitions 1 --topic test
4.3 test用户执行kafka-console-producer.sh
创建test用户的keytab文件,用户zookeeper/kafka的认证
su root sh /usr/lib/has-current/bin/hadmin-local.sh /etc/ecm/has-conf -k /etc/ecm/has-conf/admin.keytab HadminLocalTool.local: #直接按回车可以看到一些命令的用法 HadminLocalTool.local: addprinc #输入命令按回车可以看到具体命令的用法 HadminLocalTool.local: addprinc -pw 123456 test #添加test的princippal,密码设置为123456 HadminLocalTool.local: ktadd -k /home/test/test.keytab test #导出keytab文件,后续可使用该文件
添加kafka_client_test.conf
如文件放到
/home/test/kafka_client_test.conf
,内容如下:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="kafka" keyTab="/home/test/test.keytab" principal="test"; }; // Zookeeper client authentication Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true useTicketCache=false serviceName="zookeeper" keyTab="/home/test/test.keytab" principal="test"; };
添加producer.conf
如文件放到
/home/test/producer.conf
,内容如下:security.protocol=SASL_PLAINTEXT sasl.mechanism=GSSAPI
执行kafka-console-producer.sh
su test export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf" kafka-console-producer.sh --producer.config /home/test/producer.conf --topic test --broker-list emr-worker-1:9092
由于没有设置ACL,所以上述会报错:
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]
设置ACL
同样
kafka-acls.sh
也需要kafka账号执行su kafka export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Write --topic test
再执行kafka-console-producer.sh
su test export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf" kafka-console-producer.sh --producer.config /home/test/producer.conf --topic test --broker-list emr-worker-1:9092
正常:
[2018-02-28 22:25:36,178] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser) >alibaba >E-MapReduce >
4.4 test用户执行kafka-console-consumer.sh
上面成功执行kafka-console-producer.sh,并往topic里面写入一些数据后,就可以执行kafka-console-consumer.sh
进行消费测试.
添加consumer.conf
如文件放到/home/test/consumer.conf,内容如下:
security.protocol=SASL_PLAINTEXT sasl.mechanism=GSSAPI
执行kafka-console-consumer.sh
su test #kafka_client_test.conf跟上面producer使用的是一样的 export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf" kafka-console-consumer.sh --consumer.config consumer.conf --topic test --bootstrap-server emr-worker-1:9092 --group test-group --from-beginning
由于未设置权限,会报错:
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: test-group
设置ACL
su kafka export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" #test-group权限 kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Read --group test-group # topic权限 kafka-acls.sh --authorizer-properties zookeeper.connect=emr-header-1:2181/kafka-1.0.0 --add --allow-principal User:test --operation Read --topic test
再执行kafka-console-consumer.sh
su test #kafka_client_test.conf跟上面producer使用的是一样的 export KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/home/test/kafka_client_test.conf" kafka-console-consumer.sh --consumer.config consumer.conf --topic test --bootstrap-server emr-worker-1:9092 --group test-group --from-beginning
正常输出:
alibaba E-MapReduce