搭建flume+kafka+storm实时流处理平台

1.系统环境要求

Linux6+

Java7+

Zookeeper

2.软件主要版本号

Flume:1.7.0

Storm:1.1.0

Kafka:2.10-0.9.0.1

Zookeeper: 3.4.10

reids : 3.2.8

3.主要软件

下载

Flume:wget -c http://mirror.bit.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

Strom:wget -c http://mirror.bit.edu.cn/apache/storm/apache-storm-1.0.3/apache-storm-1.0.3.tar.gz

Kafka: wget -c http://mirror.bit.edu.cn/apache/kafka/0.10.2.1/kafka_2.10-0.9.0.1.tgz

Redis: wget -c http://download.redis.io/releases/redis-3.2.8.tar.gz

4.安装

java环境

ssh免密钥登陆配置–主要用于集群

#创建账户
useradd hadoop
groupadd hadoop
passwd hadoop
#输入密码。。
su hadoop
cd ~
ssh-keygen -t rsa
cd ~/.ssh
touch authorzied_keys
#将id_rsa.pub文件里面的密钥复制到authorzied_keys中,保存即可

5.安装

zookeeper环境

Tar -zxvf /full-path-zookeeper-versison.tar.gz /us  r/local

Ln -s /usr/local/zookeeper-verison /usr/local/zookeeper

6.配置

zookeeper

*a.修改
zookeeper配置文件的默认配置

cd /usr/local/zookeeper/conf

cp zoo_sample.cfg zoo.cfg

Vim zoo.cfg

修改
dataDir的存放位置

例如:

dataDir=/usr/zookeeper-3.4.8/data

端口号:

clientPort=2181

日志路径:

dateLogDir=/var/log/zookeeper/

节点:(第一个端口号是节点间通讯,第二个端口号是选举节点)

server.1=hquc.hqucdomain.com:2888:3888


zookeeper集群下,需要在不同的主机的dataDir文件位置下, 创建myid文件, 里面的内容是server.x中的x,以便于唯一标识节点。

启动命令:

/usr/local/zookeeper/bin/zkServer.sh start ../conf/zoo.cfg

7.安装
storm

tar -zxvf /full-path-storm-version.tar.gz /usr/local/

Ln -s /usr/local/storm-version /usr/local/storm

Vim /etc/profile

增加
storm环境变量

STORM_HOME=/usr/local/storm

export PATH=$PATH:$STORM_HOME/bin

保存并生效

Source /etc/profile

8.配置
storm

Vim /usr/local/storm/conf/storm.yaml

主要配置一下几个参数

storm.zookeeper.servers:

- "zookeeper所在ip"

 

 

若
zookeeper没有使用默认的2181端口,则需要指定端口号

Storm.zookeeper.port: 2000

 

storm.local.dir: "storm存放数据路径"

 

(工作节点)

supervisor.slots.ports:

 - 6700

 - 6701

 - 6702

 - 6703

 

主控制节点集群

nimbus.seeds: ["host1", "host2", "host3"]

Storm图形界面端口号,可不配置,默认8080

ui.port: 8082

[图片上传中。。。(1)]

《搭建flume+kafka+storm实时流处理平台》 tmp.png

启动方式:

在主控制节点集群上面启动
nimbus

storm nimbus  &

在工作节点集群上启动
supervisor

storm supervisor &

9.kafka安装

Tar -zxvf /full-path-kafka-version.tgz /usr/local

Ln -s /usr/local/kafka-version /usr/local/kafka

10.配置
kafka

主要修改
kafka中config目录中的server.properties文件

Vim /usr/local/kafka/config/server.properties

—-修改和添加一下参数

broker.id=0  --集群中的唯一标识

listeners=PLAINTEXT://hquc.hqucdomain.com:9092

port=9092

host.name=hquc.hqucdomain.com

advertised.host.name=hquc.hqucdomain.com

advertised.port=9092

log.dirs=/data/real-time-frame/kafka_2.10-0.9.0.1/kafka-logs

zookeeper.connect=hquc.hqucdomain.com:2181

启动方式:

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

Kafka 主要命令介绍:

以下是
kafka常用命令行总结:

  • 1.查看topic的详细信息
./kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1  
  • 2、为topic增加副本
./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181
 -reassignment-json-file json/partitions-to-move.json -execute  
  • 3、创建topic
./kafka-topics.sh --create --zookeeper localhost:2181
 --replication-factor 1 --partitions 1 --topic testKJ1  
  • 4、为topic增加partition
./bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter 
–partitions 20 –topic testKJ1  
  • 5、kafka生产者客户端命令
./kafka-console-producer.sh --broker-list localhost:9092 
--topic testKJ1  
  • 6、kafka消费者客户端命令
./kafka-console-consumer.sh -zookeeper localhost:2181 
--from-beginning --topic testKJ1  
  • 7、kafka服务启动
./kafka-server-start.sh -daemon ../config/server.properties   
  • 8、下线broker
./kafka-run-class.sh kafka.admin.ShutdownBroker 
--zookeeper 127.0.0.1:2181 --broker #brokerId# 
--num.retries 3 --retry.interval.ms 60  
shutdown broker  
  • 9、删除topic
./kafka-run-class.sh kafka.admin.DeleteTopicCommand 
--topic testKJ1 --zookeeper 127.0.0.1:2181  
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic testKJ1  
  • 10、查看consumer组内消费的offset
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker 
--zookeeper localhost:2181 --group test --topic testKJ1

11.安装
flume(用于监控文件变化)

Tar -zxvf /full-path-flume-version.tar.gz /usr/local

Ln -s /usr/local/flume-version /usr/local/flume

12.flume架构图
来一个常见架构:多 agent 汇聚写入 HDFS+kafka

《搭建flume+kafka+storm实时流处理平台》 08014623_InSH.png

13.配置
flume

Cd /usr/local/flume/conf

Cp ./flume-env.sh.template ./flume-env.sh

Vim flume-env.sh

写入
java_home的位置

JAVA_HOME=/usr/local/java
JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k -Xmn2g  
-XX:+UseG1GC -XX:-UseGCOverheadLimit"

Cp ./flume-conf.properties.template ./flume-conf.properties

将flume环境变量写入/etc/profile

FLUME_HOME=/usr/local/flume

export PATH=$PATH:$FLUME_HOME/bin 

在汇聚节点上增加flume服务端配置文件

touch /usr/local/flume/conf/flume-master-conf.properties
vim /usr/local/flume/conf/flume-master-conf.properties
#加入以下内容

collectorMainAgent.channels = channel_kafka channel_hdfs
collectorMainAgent.sources  = s2
collectorMainAgent.sinks    =k1 k2
# collectorMainAgent AvroSource
#
collectorMainAgent.sources.s2.type = avro
collectorMainAgent.sources.s2.bind = hquc.hqucdomain.com
collectorMainAgent.sources.s2.port = 41415
collectorMainAgent.sources.s2.channels = channel_kafka channel_hdfs

collectorMainAgent.channels.channel_kafka.type=memory
collectorMainAgent.channels.channel_kafka.capacity=10000
collectorMainAgent.channels.channel_kafka.transactionCapacity=100

#kafka
#设置Kafka接收器

collectorMainAgent.sinks.k1.channel= channel_kafka
collectorMainAgent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
collectorMainAgent.sinks.k1.brokerList=hquc.hqucdomain.com:9092
#设置Kafka的Topic
collectorMainAgent.sinks.k1.topic=test
#设置序列化方式
collectorMainAgent.sinks.k1.serializer.class=kafka.serializer.StringEncoder

# collectorMainAgent FileChannel
#
collectorMainAgent.channels.channel_hdfs.type = file
collectorMainAgent.channels.channel_hdfs.checkpointDir =/data/real-time-frame/apache-flume-1.7.0-bin/master/checkpoint
collectorMainAgent.channels.channel_hdfs.dataDirs = /data/real-time-frame/apache-flume-1.7.0-bin/master/data
collectorMainAgent.channels.channel_hdfs.capacity = 200000000
collectorMainAgent.channels.channel_hdfs.transactionCapacity=6000
collectorMainAgent.channels.channel_hdfs.checkpointInterval=60000
# collectorMainAgent hdfsSink
collectorMainAgent.sinks.k2.type = hdfs
collectorMainAgent.sinks.k2.channel = channel_hdfs
collectorMainAgent.sinks.k2.hdfs.path = hdfs://hquc.hqucdomain.com:9000/user/flume/%Y%m%d/
collectorMainAgent.sinks.k2.hdfs.filePrefix =log%Y-%m-%d
collectorMainAgent.sinks.k2.hdfs.inUsePrefix =_
collectorMainAgent.sinks.k2.hdfs.inUseSuffix =.tmp
collectorMainAgent.sinks.k2.hdfs.rollSize = 1024 * 1024
collectorMainAgent.sinks.k2.hdfs.rollCount = 0
collectorMainAgent.sinks.k2.hdfs.rollInterval = 0
collectorMainAgent.sinks.k2.hdfs.writeFormat = Text
collectorMainAgent.sinks.k2.hdfs.fileType = DataStream
collectorMainAgent.sinks.k2.hdfs.batchSize = 6000
collectorMainAgent.sinks.k2.hdfs.callTimeout = 60000
collectorMainAgent.sinks.k2.hdfs.useLocalTimeStamp=true

在client节点上增加flume客户端配置

touch /usr/local/flume/conf/flume-client-conf.properties
vim /usr/local/flume/conf/flume-client-conf.properties
#追加以下内容
#flume客户端配置
clientMainAgent.channels = channel_main
clientMainAgent.sources  = source_main
clientMainAgent.sinks    = sink_k1
# clientMainAgent sinks group
#clientMainAgent.sinkgroups = g1
# clientMainAgent Spooling Directory Source
clientMainAgent.sources.source_main.type=exec
clientMainAgent.sources.source_main.command=tail -F /data/real-time-frame/log
clientMainAgent.sources.source_main.channels=channel_main
clientMainAgent.channels.channel_main.type=memory
clientMainAgent.channels.channel_main.capacity=10000
clientMainAgent.channels.channel_main.transactionCapacity=100
# clientMainAgent FileChannel
clientMainAgent.channels.channel_main.type = file
clientMainAgent.channels.channel_main.checkpointDir = /data/real-time-frame/apache-flume-1.7.0-bin/data/checkpoint
clientMainAgent.channels.channel_main.dataDirs = /data/real-time-frame/apache-flume-1.7.0-bin/data/data
clientMainAgent.channels.channel_main.capacity = 200000000
clientMainAgent.channels.channel_main.keep-alive = 30
clientMainAgent.channels.channel_main.write-timeout = 30
clientMainAgent.channels.channel_main.checkpoint-timeout=600
clientMainAgent.sinks.sink_k1.channel = channel_main
clientMainAgent.sinks.sink_k1.type = avro
# connect to CollectorMainAgent
clientMainAgent.sinks.sink_k1.hostname = hquc.hqucdomain.com
clientMainAgent.sinks.sink_k1.port = 41415

启动方式:

cd /usr/local/flume/bin
./flume-ng agent -n agent -c  ../conf -f ../conf/flume-conf.properties \
-Dflume.root.logger=INFO,console &

14.安装redis

tar -zxvf /full-path-redis-version  /usr/local/
ln -s /usr/local/redis/redis-version/  /usr/local/redis
cd /usr/lcoal/redis
make
cd src 
make install PREFIX=/usr/local/redis

#安装redis集群依赖
yum -y install ruby ruby-devel rubygems rpm-build
gem install redis

14.配置redis

#创建redis集群配置文件
cd /usr/local/redis
mkdir etc
cd etc
#以下在个节点上创建redis集群配置文件,本篇使用单机伪集群
touch 6379
touch 7000
touch 7001
touch 7002
touch 7003

添加redis配置文件

#主要添加一下内容即可
#节点使用的端口
port 7000
#redis数据存放位置
dir path 
bind ip
cluster-enabled yes
cluster-config-file nodes.port.conf
cluster-node-timeout 5000
appendonly yes

分别启动各个节点的redis服务

/usr/local/redis/bin/redis-server /usr/local/redis/etc/cluster/....
#加入集群

/usr/local/redis/bin/src/redis-trib.rb create  \
 --replicas 1  192.168.0.77:6379 192.168.0.77:7000 192.168.0.77:7001 ...

#查看集群状态
/usr/local/redis/bin/redis-cli -c -h 192.168.0.77 
cluster info

《搭建flume+kafka+storm实时流处理平台》 Paste_Image.png

(在使用/redis-trib.rb脚本可能出现缺少依赖的问题)
yum install ruby rubygems 
gem install redis
    原文作者:发光的鱼
    原文地址: https://www.jianshu.com/p/0f91b9c82415
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞