如何解析SparkSQL外部数据源


这期内容当中小编将会给大家带来有关如何解析SparkSQL外部数据源,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。大数据MapReduce,Hive,Spark作业,首先需要加载数据,数据的存放源可能是HDFS、HBase、S3、OSS mongoDB;数据格式也可能为json、text、csv、parquet、jdbc..或者数据格式经过压缩,不同格式文件需要不同的解析方式,如果需要HDFS关联MySQL数据,可以通过sqoop进行一些列转换到,如果使用External Data Source API直接加载为DF拿到数据,简单的说可以通过SparkSQL拿到外部数据源数据加载成DF。加载方式:build-in :内置加载外部数据如 json、text、parquet、jdbc、HDFS;third-party:第三方加载外部数据如HBase、S3、OSS mongoDB 第三方JAR地址:https://spark-packages.org/
Maven工程需要导入gav spark-shell:需要外部导入–package 香港云主机 g:a:v SPARK_HOME/bin/spark-shell –packages com.databricks:spark-csv_2.11:1.5.0
优势:下载依赖包到本地缺点:内网环境没有网络无法下载Spark context Web UI available at http://hadoop001:4040Spark context available as ‘sc’ (master = local[2], app id = local-1536244013147).Spark session available as ‘spark’.Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ ‘_/ /___/ .__/_,_/_/ /_/_ version 2.3.1 /_/Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)Type in expressions to have them evaluated.Type :help for more information.scala> spark.read.load(“file:///home/hadoop/app/spark–bin-2.6.0-cdh6.7.0/examples/src/main/resources/people.txt”).show提示错误:/people.txt is not a Parquet file注意:spark.read.load()底层默认读取Parquet filescala> spark.read.load(“file:///home/hadoop/app/spark–bin-2.6.0-cdh6.7.0/examples/src/main/resources/users.parquet”).show18/09/06 10:32:29 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectExceptionscala> val users = spark.read.load(“file:///home/hadoop/app/spark–bin-2.6.0-cdh6.7.0/examples/src/main/resources/users.parquet”)users: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string … 1 more field]scala> users.printSchemaroot|– name: string (nullable = true)|– favorite_color: string (nullable = true)|– favorite_numbers: array (nullable = true)| |– element: integer (containsNull = true)scala> users.show– 查看列,常规操作scala> users.select(“name”).show二、转换操作– 转成json格式输出scala> users.select(“name”,”favorite_color”).write.format(“json”).save(“file:////home/hadoop/data/parquet/”)– 不采取压缩.option(“compression”,”none”) — 转成text格式输出scala> users.select(“name”).write.format(“text”).save(“file:////home/hadoop/data/parquet2/”)[hadoop@hadoop001 parquet2]$ cat *Alyssa— Save Modes用法:.mode(“”)1、default — 目标目录存在,抛出异常2、append
— 目标目录存在,重跑数据+1,无法保证数据幂等
3、overwrite– 目标目录存在,覆盖原文件4、ignore– 忽略你的模式,目标纯在将不保存— 读取外部MySQL数据为DFval jdbcDF = spark.read.format(“jdbc”).option(“url”, “jdbc:mysql://hadoop001:3306/ruozedata”).option(“driver”,”com.mysql.jdbc.Driver”).option(“dbtable”, “city_info”).option(“user”,”root”).option(“password”, “root”).load()– 查看表信息jdbcDF.show()– 获取本地数据val deptDF = spark.table(“dept”)– join关联使用deptDF.join(jdbcDF,deptDF.col(“deptid”) === jdbcDF.col(“deptid”))– DF写入MySQL本地,数据类型有变化,重复写入需要加上.mode(“overwrite”)jdbcDF.write.format(“jdbc”).option(“url”, “jdbc:mysql://hadoop001:3306/hive_data”).option(“driver”,”com.mysql.jdbc.Driver”).option(“dbtable”, “city_info_bak”).option(“user”,”root”).option(“password”, “root”).save()mysql> show tables– 如果想类型不发生变化指定option指定字段类型.option(“createTableColumnTypes”, “name CHAR(64), comments VARCHAR(1024)”)— SQL创建临时表视图,单sessionshow tbales;INSERT INTO TABLE emp_sqlSELECT * FROM emp_sql disk
network CPU外部数据外(1T)——->获取本地磁盘(1T)———->提交到集群(1T)———>结果(1G) disk
network CPU外部数据外(1T)——->经过列裁剪(10G)———–>提交到集群(10G)———–>传结果(1g) disk
CPU network外部数据外(1T)——->经过列裁剪(10G)———->进过计算(1G)———–>传输结果– 0.有效的读取外部数据源的数据的– 1.buildScan扫描整张表,变成一个RDD[ROW]trait
TableScan
{def
buildScan(): RDD[Row] }– 2.PrunedScan获取表的裁剪列trait
PrunedScan
{def
buildScan(requiredColumns: Array[String]): RDD[Row]}– 3.PrunedFilteredScan列裁剪,行过滤trait
PrunedFilteredScan
{def
buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]}– 4.加载外部数据源的数据,定义数据的schema信息abstract class
BaseRelation{}– 5.Relation产生BaseRelation使用trait
RelationProvider
{}总归:— 查询类操作trait
PrunedScan
{ def
buildScan(requiredColumns: Array[String]): RDD[Row]} — 列裁剪,行过滤trait
PrunedFilteredScan
{ def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]} — 写入类操作trait
InsertableRelation
{ def
insert(data: DataFrame, overwrite: Boolean): Unit}上述就是小编为大家分享的如何解析SparkSQL外部数据源了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注开发云行业资讯频道。

相关推荐: Mysql中有哪些日志

这篇文章将为大家详细讲解有关Mysql中有哪些日志,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。redo log重做日志,是在binlog之上,一条update语句执行时,首先会记录一条redo log,然后再…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

(0)
打赏 微信扫一扫 微信扫一扫
上一篇 09/23 17:44
下一篇 09/23 17:44

相关推荐