怎么自定义JDBCRDD的分区


这篇文章主要讲解了“怎么自定义JDBCRDD的分区”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么自定义JDBCRDD的分区”吧!1,JDBCRDD使用val data = new JdbcRDD(sc, getConnection, “SELECT id,aa FROM bbb where ?
参数解释:1,sparkcontext。2,一个创建链接的函数。3,sql。必须有?
4,要取数据的id最小行。5,要取数据的id最大行号。6,分区数。7,一个将ResultSet转化为需要类型的方法。2,JdbcRDD的getPartition方法override def getPartitions: Array[Partition] = {
// bounds are inclusive, hence the + 1 here and – 1 on end
val length = BigInt(1) + upperBound – lowerBound
(0 until numPartitions).map(i => {
val start = lowerBound + ((i * length) / numPartitions)
val end = lowerBound + (((i + 1) * length) / numPartitions) – 1
new JdbcPartition(i, start.toLong, end.toLong)
}).toArray
}3,JdbcRDD的compute方法就是一个通过jdbc获取指定范围数据的过程。val part = thePart.asInstanceOf[JdbcPartition]
val conn = getConnection()
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
stmt.setLong(1, part.lower)
stmt.setLong(2, part.upper)
val rs = stmt.executeQuery()4,重写JDBC方法重写分区的方法即可。如:CustomizedJdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
getCustomizedPartitions: () => Array[Partition],
prepareStatement: (PreparedStatement, CustomizedJdbcPartition) => PreparedStatement,
mapRow: (ResultSet) => T = Customiz 香港云主机edJdbcRDD.resultSetToObjectArray _)同时把getPartition方法重写为:override def getPartitions: Array[Partition] = {
getCustomizedPartitions();
}感谢各位的阅读,以上就是“怎么自定义JDBCRDD的分区”的内容了,经过本文的学习后,相信大家对怎么自定义JDBCRDD的分区这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是开发云,小编将为大家推送更多相关知识点的文章,欢迎关注!

相关推荐: leecode如何实现卡牌分组功能

这篇文章给大家分享的是有关leecode如何实现卡牌分组功能的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。914. 卡牌分组https://leetcode-cn.com/problems/x-of-a-kind-in-a-deck…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

Like (0)
Donate 微信扫一扫 微信扫一扫
Previous 10/07 10:13
Next 10/07 10:13

相关推荐