spark+kafka+Elasticsearch单机环境的部署和性能测试

版本选型

spark 1.5.2 + kafka 0.9.0.1 + Elasticsearch 2.2.1

安装部署

1. 安装脚本及文件 密码 4m7l

安装脚本和服务都是单机简化版,没有保护机制。有兴趣的朋友可以一起写一个集群的安装脚本和服务
http://pan.baidu.com/s/1jIuezxO

2. 脚本使用

  • vi /etc/hosts
    添加 127.0.0.1 hostname
  • cd npminstall

install.sh

#!/bin/sh

DIRNAME=`dirname "$0"`
localhome=`cd "$DIRNAME"; pwd`
upperhome=`cd "$DIRNAME/.."; pwd`

export username=root
export installpath=/data/mdware

export hadoopFileName=hadoop-2.4.1
export hadoopPackageName=hadoop-2.4.1.tar.gz

export sparkFileName=spark-1.5.2-bin-hadoop2.4
export sparkPackageName=spark-1.5.2-bin-hadoop2.4.tgz

export kafkaFileName=kafka_2.10-0.9.0.1
export kafkaPackageName=kafka_2.10-0.9.0.1.tgz

export elasticFileName=elasticsearch-2.2.1
export elasticPackageName=elasticsearch-2.2.1.tar.gz

export kibanaFileName=kibana-4.4.2-linux-x64
export kibanaPackageName=kibana-4.4.2-linux-x64.tar.gz

export installServices="hadoop
spark
kafka
elastic
kibana"

mkdir -p $installpath

for com in $installServices ; do

        if [ $com"" == "hadoop" ] ; then
                cp $localhome/files/$hadoopPackageName $installpath
                cd $installpath && tar -zxf $hadoopPackageName
                \cp -r $localhome/conf/hadoop/* $installpath/$hadoopFileName/etc/hadoop/
                sh $installpath/$hadoopFileName/bin/hdfs namenode -format
                rm -rf $installpath/$hadoopPackageName
                ln -s $installpath/$hadoopFileName/ $installpath/hadoop
        fi

        if [ $com"" == "spark" ] ; then
                cp $localhome/files/$sparkPackageName $installpath
                cd $installpath && tar -zxf $sparkPackageName
                \cp -r $localhome/conf/spark-env.sh $installpath/$sparkFileName/conf/
                rm -rf $installpath/$sparkPackageName
                ln -s $installpath/$sparkFileName/ $installpath/spark
                fi

        if [ $com"" == "kafka" ] ; then
                cp $localhome/files/$kafkaPackageName $installpath
                cd $installpath && tar -zxf $kafkaPackageName
                \cp $localhome/conf/server.properties $installpath/$kafkaFileName/config/
                rm -rf $installpath/$kafkaPackageName
                ln -s $installpath/$kafkaFileName/ $installpath/kafka
        fi

        if [ $com"" == "elastic" ] ; then
                cp $localhome/files/$elasticPackageName $installpath
                cd $installpath && tar -zxf $elasticPackageName
                \cp $localhome/conf/elasticsearch.yml $installpath/$elasticFileName/config/
                rm -rf $installpath/$elasticPackageName
                ln -s $installpath/$elasticFileName/ $installpath/es
                $installpath/es/bin/plugin install mobz/elasticsearch-head/2.2.1
                $installpath/es/bin/plugin install lmenezes/elasticsearch-kopf/2.2.1
        fi

        if [ $com"" == "kibana" ] ; then
                cp $localhome/files/$kibanaPackageName $installpath
                cd $installpath && tar -zxf $kibanaPackageName
                rm -rf $installpath/$kibanaPackageName
                ln -s $installpath/$kibanaFileName/ $installpath/kibana
        fi
done
chmod +x $localhome/manage.sh
cp $localhome/manage.sh /etc/init.d/npm
chkconfig npm on
chmod +x install.sh
./install.sh

3. 启动进程

service npm start

npm服务

#!/bin/bash
# chkconfig: 2345 20 81
# description: start and stop npm service
# processname: npm

. /etc/rc.d/init.d/functions
prog="npm"

DIRNAME=`dirname "$0"`
localhome=`cd "$DIRNAME"; pwd`

menSize=`free -g | awk 'NR==2{print $2}'`
men_size=`expr ${menSize} + 1`
heap_size=`expr ${men_size} / 4`

export installpath=/data/mdware

start(){
                ulimit -n 655360
        sh $installpath/hadoop/sbin/hadoop-daemon.sh start namenode
        sh $installpath/hadoop/sbin/hadoop-daemon.sh start datanode
        $installpath/hadoop/bin/hdfs dfsadmin -safemode leave
        sh $installpath/spark/sbin/start-master.sh
        sh $installpath/spark/sbin/start-slave.sh spark://localhost:7077
        nohup $installpath/kafka/bin/zookeeper-server-start.sh $installpath/kafka/config/zookeeper.properties >> $installpath/kafka/zookeeper.log &
        sleep 60
        nohup $installpath/kafka/bin/kafka-server-start.sh $installpath/kafka/config/server.properties >> $installpath/kafka/kafka.log &
        export ES_HEAP_SIZE=${heap_size}g    
              $installpath/es/bin/elasticsearch -Des.insecure.allow.root=true -d
}

stop(){
        sh $installpath/hadoop/sbin/hadoop-daemon.sh stop namenode
        sh $installpath/hadoop/sbin/hadoop-daemon.sh stop datanode

        sh $installpath/spark/sbin/stop-master.sh
        sh $installpath/spark/sbin/stop-slave.sh
        
        zookeeper_id=`ps -ef | grep -i zookeeper.properties | grep -v grep | awk '{print $2}'`
        
        if [[ -z $zookeeper_id ]];then
                echo "The task is not running ! "
        else
                kill ${zookeeper_id}
        fi

        kafka_id=`ps -ef | grep -i server.properties | grep -v grep | awk '{print $2}'`
        if [[ -z $kafka_id ]];then
                echo "The task is not running ! "
        else
                kill ${kafka_id}
        fi

        es_id=`ps -ef|grep -i elasticsearch  | grep -v "grep"|awk '{print $2}'`
        if [[ -z $es_id ]];then
                echo "The task is not running ! "
        else
                kill ${es_id}
        fi
        
        
        sleep 20
        
        if [[ -z $zookeeper_id ]];then
                echo "The task is not running ! "
        else
                kill -9 ${zookeeper_id}
        fi

        kafka_id=`ps -ef | grep -i server.properties | grep -v grep | awk '{print $2}'`
        if [[ -z $kafka_id ]];then
                echo "The task is not running ! "
        else
                kill -9 ${kafka_id}
        fi

        es_id=`ps -ef|grep -i elasticsearch  | grep -v "grep"|awk '{print $2}'`
        if [[ -z $es_id ]];then
                echo "The task is not running ! "
        else
                kill -9 ${es_id}
        fi
        
}

case "$1" in
        start)
                start
    ;;
  stop)
        stop
        ;;
  *)
    echo $"Usage: $0 {start|stop}"
    exit 2
esac
exit $?
注:进程已设为开机自启动

测试代码

https://github.com/engimatic/effectivejava/tree/master/sparkanalysis

public class KafkaDataProducer implements Runnable{
    private static Logger log = Logger.getLogger(KafkaDataProducer.class);

    private static Producer<String, String> producer;

    private String topic;

    private String path;

    public KafkaDataProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", Config.getConfig("database.cnf").getProperty("bootstrap.server"));
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
    }

    public KafkaDataProducer(String topic, String path) {
        this.path = path;
        this.topic = topic;

        Properties props = new Properties();
        props.put("bootstrap.servers", Config.getConfig("database.cnf").getProperty("bootstrap.server"));
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
    }

    public static void main(String[] args) throws Exception {
        KafkaDataProducer kafkaDataProducer1 = new KafkaDataProducer("test","datafile");
        new Thread(kafkaDataProducer1).start();

//        KafkaDataProducer kafkaDataProducer2 = new KafkaDataProducer("tcptest","tcp.file");
//        new Thread(kafkaDataProducer2).start();
//
//        KafkaDataProducer kafkaDataProducer3 = new KafkaDataProducer("httptest","http.file");
//        new Thread(kafkaDataProducer3).start();

    }

    @Override
    public void run() {
        BufferedReader br = null;
        try {
            while ( true ) {
                br = new BufferedReader(new FileReader(Config.getConfig("database.cnf").getProperty(path)));
                String line;

                while ((line = br.readLine()) != null) {
                    if (!"".equals(line.trim())) {
                        producer.send(new ProducerRecord<>(topic, "", line));
                    }
                }
                Thread.sleep(Long.valueOf(Config.getConfig("database.cnf").getProperty("sleep.time")));
            }
        } catch (Exception e) {
            log.error("The read streaming error: ", e);
        } finally {
            if (br != null) {
                try {
                    br.close();
                } catch (IOException e) {
                    log.warn("close the read streaming error: ", e);
                }
            }
        }
    }
}
public class SSDPerformanceTest extends Analysis {
    public static final Logger LOG = LoggerFactory.getLogger(SSDPerformanceTest.class);

    protected static final Pattern TAB = Pattern.compile("\t");

    private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm", Locale.CHINA);

    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd", Locale.CHINA);

    public static void main(String[] args) throws IOException {

        String configfile = "database.cnf";

        Properties config = Config.getConfig(configfile);

        JavaPairReceiverInputDStream<String, byte[]> rawStream = setupRawStreamFromKafka(
                config, config.getProperty("group.id"));

        LOG.info("database config:" + config.toString());

        rawStream.foreachRDD(new Function<JavaPairRDD<String, byte[]>, Void>() {
            @Override
            public Void call(JavaPairRDD<String, byte[]> stringJavaPairRDD) throws Exception {
                JavaRDD<Map<String, ?>> es = stringJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, byte[]>, DBKey, DBData>() {
                    public Tuple2<DBKey, DBData> call(Tuple2<String, byte[]> stringTuple2) throws Exception {
                        String[] database = TAB.split(new String(stringTuple2._2));

                        DBKey dbKey = new DBKey();
                        DBData dbData = new DBData();
                        String sqlString = new String(Base64.decodeBase64(database[10].trim()));
                        String storageSql;
                        if(sqlString.length() > 1000){
                            storageSql = sqlString.substring(0,1000);
                        }else{
                            storageSql = sqlString;
                        }

                        //DBKey
                        dbKey.setProbeName(database[0].trim());
                        dbKey.setCustomService(database[1].trim());
                        dbKey.setIpClient(database[2].trim());
                        dbKey.setIpServer(database[3].trim());
                        dbKey.setPortServer(database[5].trim());
                        dbKey.setTimeStart(format.format(new Date().getTime()));
                        dbKey.setOperateType(storageSql.split(" ")[0]);   //Select, Insert, Update, Drop, Procedure
                        dbKey.setDbType(database[8].trim());

                        dbKey.setResponseCode(database[9].trim());
                        dbKey.setUser(database[2].trim());
                        dbKey.setSqlString(storageSql);

                        if(!database[12].trim().equals("-")) {
                            dbData.setOperateTime(Double.parseDouble(database[12].trim()));
                        }else if(!database[7].trim().equals("-")){
                            dbData.setOperateTime(Double.parseDouble(database[7].trim()) - Double.parseDouble(database[6].trim()));
                        }else{
                            dbData.setOperateTime(0);
                        }

                        if(!database[13].trim().equals("-")) {
                            dbData.setReqTransTime(Double.parseDouble(database[13].trim()));
                        }else{
                            dbData.setReqTransTime(0);
                        }

                        if(!database[14].trim().equals("-")) {
                            dbData.setRespTransTime(Double.parseDouble(database[14].trim()));
                        }else{
                            dbData.setRespTransTime(0);
                        }

                        if(!database[15].trim().equals("-")) {
                            dbData.setRespPayload(Integer.parseInt(database[15].trim()));
                        }else{
                            dbData.setRespPayload(0);
                        }

                        dbData.setCount(1);

                        dbData.setSlowCount(1);

                        return new Tuple2<>(dbKey,dbData);

                    }
                }).filter(new Function<Tuple2<DBKey, DBData>, Boolean>() {
                    @Override
                    public Boolean call(Tuple2<DBKey, DBData> v1) throws Exception {
                        return v1 != null;
                    }
                }).reduceByKey(new Function2<DBData, DBData, DBData>() {
                    public DBData call(DBData v1, DBData v2) throws Exception {
                        DBData result = new DBData();
                        result.setOperateTime(v1.getOperateTime() + v2.getOperateTime());
                        result.setReqTransTime(v1.getReqTransTime() + v1.getReqTransTime());
                        result.setRespTransTime(v1.getRespTransTime() + v2.getRespTransTime());
                        result.setRespPayload(v1.getRespPayload() + v2.getRespPayload());
                        result.setCount(v1.getCount() + v2.getCount());
                        result.setSlowCount(v1.getSlowCount() + v1.getSlowCount());
                        return result;
                    }
                }).map(new Function<Tuple2<DBKey,DBData>, Map<String, ?>>() {
                    public Map<String, ?> call(Tuple2<DBKey, DBData> v1) throws Exception {
                        DBKey dbKey = v1._1;
                        DBData dbData = v1._2;
                        ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
                        builder.put("index_name", sdf.format(format.parse(dbKey.getTimeStart())));
                        builder.put("probeName",dbKey.getProbeName());
                        builder.put("customService",dbKey.getCustomService());
                        builder.put("ipClient",dbKey.getIpClient());
                        builder.put("ipServer",dbKey.getIpServer());
                        builder.put("portServer",dbKey.getPortServer());
                        builder.put("operateType",dbKey.getOperateType());
                        builder.put("timeStart",format.parse(dbKey.getTimeStart()));
                        builder.put("dbType",dbKey.getDbType());
                        builder.put("user",dbKey.getUser());
                        builder.put("responseCode",dbKey.getResponseCode());
                        builder.put("sqlString",dbKey.getSqlString());
                        builder.put("operateTime",dbData.getOperateTime());
                        builder.put("reqTransTime",dbData.getReqTransTime());
                        builder.put("respTransTime",dbData.getRespTransTime());
                        builder.put("respPayload",dbData.getRespPayload());
                        builder.put("count",dbData.getCount());
                        builder.put("slowCount",dbData.getSlowCount());
                        return builder.build();
                    }
                }).cache();

                if (es != null) {
                    JavaEsSpark.saveToEs(es, "ni-database-{index_name}/database", ImmutableMap.of
                            (ConfigurationOptions.ES_MAPPING_EXCLUDE, "index_name"));
                }
                return null;
            }
        });

        rawStream.context().start();
        rawStream.context().awaitTermination();

    }

}

测试环境

测试环境一 虚拟机环境(8G内存 2核 非ssd)

  1. 分钟写入数据量
  2. 分钟写入事件数

测试环境二 虚拟机环境(8G内存 2核 ssd)

  1. 分钟写入数据量
  2. 分钟写入事件数

    《spark+kafka+Elasticsearch单机环境的部署和性能测试》

测试环境三 IBM服务器(126G内存 16核 非ssd)

  1. 分钟写入数据量
  2. 分钟写入事件数

测试环境四 IBM服务器(126G内存 16核 ssd约160G)

任务资源分配 2G 2core

  1. 分钟写入数据量
    忘了记录
  2. 分钟写入事件数
  • 单独写入database数据

    《spark+kafka+Elasticsearch单机环境的部署和性能测试》

  • database和tcp数据一起写入

    《spark+kafka+Elasticsearch单机环境的部署和性能测试》 database
    《spark+kafka+Elasticsearch单机环境的部署和性能测试》 tcp

ps : 后续测试与配置变更会及时更新
集群安装请参考http://www.jianshu.com/p/654b5fd42a5d

    原文作者:谜碌小孩
    原文地址: https://www.jianshu.com/p/8dd41ca2b260
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞