kafka 的集群搭建

启动zookeeper

在本地2181端口启动ZK。zookeeper集群启动参考
https://blog.csdn.net/qiushisoftware/article/details/79043379

bin/zookeeper-server-start.sh config/zookeeper.properties

如果你需要对zookeeper开启SASL认证,请在配置文件中加上

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=360000

并编写JAAS文件

Server {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret"
   user_admin="admin-secret";
};

启动命令行如下

export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/app/gdl/zookeeper-3.4.11/conf/zookeeper_jaas.conf"
nohup sh bin/zkServer.sh start conf/zoo.cfg >zoo.log&

如果你使用的是kafka自带的zookeeper,请参考
https://blog.csdn.net/geting/article/details/52044055

启动kafka

复制kafka的配置文件,两个配置文件的端口号应该不同

bin/kafka-server-start.sh config/server.properties
bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

如果你在云环境部署,建议在listeners中配置上该主机的IP。否则,broker注册到zk的将是自己的主机名,如果云环境不能很好的解析主机名,就会导致问题。
如果你的主机内网访问IP和外网访问IP不同,listeners中应配置内网访问IP,advertised listeners应配置外网访问IP。

创建topic(使用命令行)

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

//查看已经创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
//查看topic的具体信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

生产消息(使用命令行)

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

消费消息(使用命令行)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

部署kafka监控

这里使用kafkamanager,这个使用play框架
下载地址
https://github.com/yahoo/kafka-manager/releases

首先,在用户目录的./sbt下建立如下repositories文件

[repositories]
local
aliyun: http://maven.aliyun.com/nexus/content/groups/public
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly

解压缩并进入上述文件夹

./sbt clean dist

kafka manager应配置登陆权限,避免配置被意外更改。kafka manager通过访问zookeeper来操纵kafka集群,即使没有broker存活,也可以对相关信息进行更改。

注意,现在kafka集群采取在集群中topic中存储offset的做法。但是kafka manager只支持PLAINTEXT的方式访问集群。所以必须为集群配置PLAINTEXT

启动前,需要对配置文件做如下修改

kafka-manager.zkhosts="10.237.64.46:2181"#这是你zookeeper的IP

运行启动脚本即可。

部署schema-registry的插件

使用请参考如下文档:
https://docs.confluent.io/current/schema-registry/docs/maven-plugin.html
启动前修改schema-registry.properties文件

listeners=http://10.237.64.46:9096 #对外提供服务的端口
kafkastore.connection.url=localhost:2181 #kafka zk的ip 用于持久化

运行启动脚本即可。

部署schema-registry ui

请参考如下文档
https://github.com/landoop/schema-registry-ui
使用UI一定要设置跨域访问。跨域访问除了服务端设置,还要在浏览器上设置跨域访问。

为kafka配置SSL

首先,如果要使用外网来传递消息,安全起见,需要加上SSL
在server.properties文件上加上如下配置:

##内网client和broker可以通过9092端口明文访问,外网client则通过9094端口SSL方式访问
listeners=PLAINTEXT://10.83.1.48:9092,SSL://10.83.1.48:9094
ssl.keystore.location=/opt/app/mskyprocess/kafka_2.11-1.0.1/ssl/server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/opt/app/mskyprocess/kafka_2.11-1.0.1/ssl/server.truststore.jks
ssl.truststore.password=123456

server.keystore.jks和server.truststore.jks的取得请参考kafka官方文档(其实就是SSL的公钥库和信任库)。
http://kafka.apache.org/documentation/#security_overview
使用JDK自带的keytool工具。具体命令如下

#!/bin/bash
            #Step 1
            ##产生服务端的keystoreserver.keystore.jks文件(加密算法为RSA,有效期为36500,存储在server.keystore.jks,别名为localhost)
            keytool -keystore server.keystore.jks -alias localhost -validity 36500 -keyalg RSA -genkey
            ##产生客户端的keystore
            keytool -keystore client.keystore.jks -alias localhost -validity 36500 -keyalg RSA -genkey

            #Step 2
            ##产生ca-cert ca-key文件 就是一个密钥对 注意集群中的ca-key和ca-cert应该保持一致!(也就是说你配置除了第一台服务器外,其他服务器应该吧这两个文件和ca-cert.srl直接拷贝过去)
            openssl req -new -x509 -keyout ca-key -out ca-cert -days 36500
            ##添加server和client对CA的信任。
            keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
            keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert 

            #Step 3
            ##将第一步生成的localhost密钥导出(公钥)为cert-file文件
            keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
            ##对client进行同样的操作
            keytool -keystore client.keystore.jks -alias localhost -certreq -file client-cert-file

            ##使用ca-key(CA的私钥)对上一步导出的cert-file进行加密,加密结果为cert-signed文件
            openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 36500
            ##对于client做同样操作
            openssl x509 -req -CA ca-cert -CAkey ca-key -in client-cert-file -out client-cert-signed -days 36500 

            ##Step 4
            #将CA证书导入服务器keystore
            keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
            keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert

            #签名后的server认证加入server的keystore,client认证加到client的keystore
            keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
            keytool -keystore client.keystore.jks -alias localhost -import -file client-cert-signed

配置完成后可以作如下验证,验证SSL是否配置成功

openssl s_client -debug -connect localhost:9094 -tls1
  • 配置命令行producer
    在producer.properties文件下加入如下设置
bootstrap.servers=localhost:9094
security.protocol=SSL
ssl.truststore.location=/opt/app/mskyprocess/kafka_2.11-1.0.1/ssl/client.truststore.jks
ssl.truststore.password=123456

启动之

sh kafka-console-producer.sh --broker-list 10.221.198.126:9094 --topic test --producer.config ../config/producer.properties
  • 配置命令行consumer
    修改consumer.propeties文件,加入如上producer.properties的设置。启动之。
sh kafka-console-consumer.sh --bootstrap-server 10.221.198.126:9094 --topic test --consumer.config ../config/consumer.properties

访问控制

访问控制x联系我 yxxy1717 2317384986

即使在内网,我们也可能会面临员工误操作带来悲剧事件。所以,我们需要加上SASL_JAAS来进行访问控制。简单来说,就是明文校验用户名和密码。

首先在broker端的配置目录新建kafka_server_jaas.conf

KafkaServer {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="admin"
       password="admin"
       user_admin="admin"

    原文作者:小白中的搬运工
    原文地址: https://zhuanlan.zhihu.com/p/43263778
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞