本篇内容介绍了“Spark Core读取ES的分区问题案例分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!1.Spark Core读取ESES官网直接提供的有elasticsearch-hadoop插件,对于ES 7.x,hadoop和Spark版本支持如下:浪尖这了采用的ES版本是7.1.1,测试用的Spark版本是2.3.1,没有问题。整合es和spark,导入相关依赖有两种方式:a,导入整个elasticsearch-hadoop包b,只导入spark模块的包浪尖这里为了测试方便,只是在本机起了一个单节点的ES实例,简单的测试代码如下:import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions
object es2sparkrdd {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)
conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1")
conf.set(ConfigurationOptions.ES_PORT, "9200")
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
// conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
// conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")
val sc = new SparkContext(conf)
import org.elasticsearch.spark._
sc.esRDD("posts").foreach(each=>{
each._2.keys.foreach(println)
})
sc.esJsonRDD("posts").foreach(each=>{
println(each._2)
})
sc.stop()
}
}
可以看到Spark Core读取RDD主要有两种形式的API:a,esRDD。这种返回的是一个tuple2的类型的RDD,第一个元素是id,第二个是一个map,包含ES的document元素。b,esJsonRDD。这种返回的也是一个tuple2类型 香港云主机的RDD,第一个元素依然是id,第二个是json字符串。虽然是两种类型的RDD,但是RDD都是ScalaEsRDD类型。要分析Spark Core读取ES的并行度,只需要分析ScalaEsRDD的getPartitions函数即可。2.源码分析首先导入源码https://github.com/elastic/elasticsearch-hadoop这个是gradle工程,可以直接导入idea,然后切换到7.x版本即可。废话少说直接找到ScalaEsRDD,发现gePartitions是在其父类实现的,方法内容如下:override def getPartitions: Array[Partition] = {
esPartitions.zipWithIndex.map { case(esPartition, idx) =>
new EsPartition(id, idx, esPartition)
}.toArray
}
esPartitions是一个lazy型的变量:这种声明原因是什么呢?lazy+transient的原因大家可以考虑一下。RestService.findPartitions方法也是仅是创建客户端获取分片等信息,然后调用,分两种情况调用两个方法。a).findSlicePartitions这个方法其实就是在5.x及以后的ES版本,同时配置了以后,才会执行,实际上就是将ES的分片按照指定大小进行拆分,必然要先进行分片大小统计,然后计算出拆分的分区数,最后生成分区信息。具体代码如下:实际上分片就是用游标的方式,对_doc进行排序,然后按照分片计算得到的分区偏移进行数据的读取,组装过程是SearchRequestBuilder.assemble方法来实现的。这个其实个人觉得会浪费一定的性能,假如真的要ES结合Spark的话,建议合理设置分片数。b).findShardPartitions方法这个方法没啥疑问了就是一个RDD分区对应于ES index的一个分片。“Spark Core读取ES的分区问题案例分析”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注开发云网站,小编将为大家输出更多高质量的实用文章!
相关推荐: 怎么将Prometheus对应的exporter做成系统服务
本篇内容介绍了“怎么将Prometheus对应的exporter做成系统服务”的有关知识,在实际案例的操作过程中,不少人 香港云主机都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!这里以node_expor…
免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。