先看源码:
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
/**
* Created by magneto on 16-6-29.
*/
object Upsert4 extends App{
val index = "test"
val target = s"$index/docs"
val props = Map("es.write.operation" -> "upsert",
//"es.input.json" -> "true",
"es.mapping.id" -> "id",
"es.update.script.lang" -> "groovy"
)
val conf = new SparkConf().setAppName("read for elasticsearch").setMaster("local")
conf.set("es.nodes", "172.24.63.14")//修改成自己使用的es服务器地址
val sc = new SparkContext(conf)
val name= Map("id" -> 3, "name" -> Set("sxl1989","sxl9199"))
val lines = sc.makeRDD(Seq(name))
val up_params = "new_name:name"
//val up_script = "if (ctx._source.containsKey(\"name\")) {ctx._source.name += new_name;} else {ctx._source.name = [new_name];}"
val up_script = "if (ctx._source.containsKey('name')) {ctx._source.name += new_name;} else {ctx._source.name = [new_name];}"
lines.saveToEs(target, props + ("es.update.script.params" -> up_params) + ("es.update.script" -> up_script))
}
其中:es.input.json控制你输入的是否是json格式的,这里的lines用的是Map所以注释掉。
另外一处注释是注释掉的脚本,此脚本在es客户端是可以运行的,但是使用es-spark不能运行,必须将双引号改成单引号才行。
name这个其实代表一个文档,id这列因为“es.mapping.id”->”id”被选做meta数据。
注意,此处的index可以使用别名。
es.write.operation -> upsert
es.update.script.lang -> groovy
es.update.script.params -> up_params
es.update.script -> up_script
是核心配置。
更多请参考:
es官方文档
elasticsearch-hadoop源码