各组件兼容性分析
本次实验使用的各组件版本为:hadoop2.7.3、hive1.2.2、scala2.11.8、spark2.1.1、zeppelin0.7.0
Hbase与Hadoop兼容性表
hadoop和hbase版本对照表
可选用的版本是1.2.x、1.3.x、2.0.x
Hbase与Hive兼容性分析
Hbase和Hive作为hadoop上数据存储、分析的组件,各有优势,有时候需要hbase和hive进行数据互通,这时候如果hive和hbase不兼容,会导致数据互通的时候出现问题,通过查询hive文档:
hive文档
发现我们需要重新编译hive1.2.2的源代码
hive兼容
Hbase与Zeppelin兼容性分析
note
zeppelin default hbase
可以看到,zeppelin默认支持1.0.x,最高支持1.2.0+,并给出了更改方案,所以我们选用hbase1.2.x版本
zookeeper的选取
分布式Apache HBase安装依赖于正在运行的ZooKeeper集群,同时可以配置hbase是否为你管理zookeeper集群,根据官方文档,选取较为新的zookeeper版本就好。
重新编译hive_hbase-handler.jar
根据前面的分析,如果需要hive和hbase数据互通,根据当前选取的各个组件版本,需要重新编译hive_hbase-handler.jar,编译方式,编译后获得的文件如下:
jars
然后将hive的相关jar包替换掉,同时将编译时依赖的jar包加入hive的lib文件夹。
Hbase与其他组件数据互通
主要是spark、hive、hbase的整合,然后最好能在zeppelin中进行调用,但是都失败了,不过还是做一个总结
与spark
spark可以通过api来访问spark的数据,编写scala程序实现数据访问需要用到相关的jar包,参考其他资料,通过在spark目录建立新的lib,将hbase相关jar包导入,然后将改lib加入spark的path路径,但是我失败了,只好使用笨办法,将hbase相关jar包直接仍到了spark的jar目录下,然后jar包读取居然没出错:
cd /usr/local/spark/jars/
cp /usr/local/hbase/lib/hbase*.jar ./
cp /usr/local/hbase/lib/guava-12.0.1.jar ./
cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
使用的代码如下:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkOperateHBase {
def main(args: Array[String]) {
val conf = HBaseConfiguration.create()
val sc = new SparkContext(new SparkConf())
//设置查询的表名
conf.set(TableInputFormat.INPUT_TABLE, "student")
val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count = stuRDD.count()
println("Students RDD Count:" + count)
stuRDD.cache()
//遍历输出
stuRDD.foreach({ case (_,result) =>
val key = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))
val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)
})
}
}
执行上面的代码,能够正确获取conf
scala> val conf = HBaseConfiguration.create()
conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, hbase-default.xml, hbase-site.xml
但是在执行
val sc = new SparkContext(new SparkConf())
会抛出错误
执行以下代码
scala> conf.set(TableInputFormat.INPUT_TABLE, "emp1")
scala> val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
| classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
| classOf[org.apache.hadoop.hbase.client.Result])
stuRDD: org.apache.spark.rdd.RDD[(org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Result)] = NewHadoopRDD[1] at newAPIHadoopRDD at <console>:40
然后后面就各种错误,spark访问就放弃了
hive
前面替换了包,hive与hbase好像可以通过建立外表的方式来获取hbase中的数据
zeppelin
网上资料还很少,官方文档说他们不支持hbase1.2.x的hbaseshell,有用点的资料是github上的一篇文章,但是他告诉我们需要重新编译zeppelin的代码好像,所以果断放弃。
总结
hadoop的搭建还是挺费时间的,处理组件的兼容问题更是有难度,这次趟的坑很多,加上知识不足,时间有限,遗留了很多问题,下次有时间,继续解决。