[TOC]
一、 Spark SQL入口
Spark 1.x中,Spark SQL入口是SQLContext
和HiveContext
;
Spark 2.x中,Spark SQL入口是SparkSession
,但为了向后兼容,旧的SQLContext和HiveContext入口仍然保留。
具体可参考官网
1、SQLContext<HiveContext<SparkSession
1.1 SQLContext
SQLContext是通往SparkSQL的入口
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
一旦有了SQLContext,就可以开始处理DataFrame、DataSet等。
1.1.1 查询语句
sqlCtx.table(库.表)
1.2 HiveContext
HiveContext是通往hive入口。HiveContext具有SQLContext的所有功能。 实际上,如果查看API文档,就会发现HiveContext扩展了SQLContext,这意味着它支持SQLContext支持的功能以及更多(Hive特定的功能)
HiveContext扩展SQL Context实现日志记录
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
1.2.1 查询语句
hive.table()
1.3 SparkSession
在py3/lib/python3.6/site-packages/pyspark/sql/session.py
中可以看到SparkSession
类的定义;建议使用SparkSession
SparkSession是在Spark 2.0中引入的, 它使开发人员可以轻松地使用它,这样我们就不用担心不同的上下文, 并简化了对不同上下文的访问。通过访问SparkSession,我们可以自动访问SparkContext。
下面是如何创建一个SparkSession
val spark = SparkSession
.builder()
.appName("hirw-test")
.config("spark.some.config.option", "some-value")
.getOrCreate()
SparkSession现在是Spark的新入口点,它替换了旧的SQLContext和HiveContext。注意,保留旧的SQLContext和HiveContext是为了向后兼容。
一旦我们访问了SparkSession,我们就可以开始使用DataFrame和Dataset了。
下面是我们如何使用Hive支持创建SparkSession。
val spark = SparkSession
.builder()
.appName("hirw-hive-test")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
前提:背景,技术,经验,软技能,硬技能
命令
spark-shell spark交互式启动 , 这个命令;http://192.168.0.103:4043
scala :Welcome to Scala 2.13.6 欢迎使用 Scala 2.13.6(Java HotSpot(TM) 64 位服务器 VM,Java 1.8.0_301)。
输入表达式进行评估。 或尝试:帮助。
spark : version 3.1.2 ;Using Python version 3.8.8
jupyter notebook: python
RDD
RDD 弹性分布式数据集,Spark 计算的基石,为用户屏蔽了底层对数据的复杂抽象和处理,为用户提供了一组方便的数据转换与求值方法。
RDD 是一个懒执行的不可变的可以支持 Lambda 表达式的并行数据集合。
RDD 的最大好处就是简单,API 的人性化程度很高。
RDD 的劣势是性能限制,它是一个 JVM 驻内存对象,这也就决定了存在 GC 的限制和数据增加时 Java 序列化成本的升高。
语法
1 随机的10条数据
sql server的随机函数newID()和RAND()
SELECT TOP 10 * FROM Northwind..Orders ORDER BY NEWID()
--从Orders表中随机取出10条记录
--随机排序
-----------------------------------------
关于随机取得表中任意N条记录的方法,很简单,就用newid():
select top N * from table_name order by newid() ----N是一个你指定的整数,表是取得记录的条数.
select top 10 *,newid() as Random from ywle where ywlename='001' ordey by Random
下者效率要高些
SELECT TOP 10 * FROM ywle order by newid()
第2章 执行 Spark SQL 查询
2.1 命令行查询流程
打开 spark-shell 例子:查询大于 30 岁的用户
SQL 在文件上。
val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop102:9000/namesAndAges.parquet`")
sqlDF.show()
SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount
FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
GROUP BY a.dateid, a.ordernumber
) c
JOIN tbDate d ON c.dateid = d.dateid
GROUP BY theyear
ORDER BY theyear DESC
Spark-SQL解析
Spark SQL的数据抽象
生成 UnResolve LogicalPlan 取消解决逻辑计划
SELECT SUM(AGE)
FROM
(SELECT A.ID,
A.NAME,
CAST(B.AGE AS LONG) AS AGE
FROM
NAME A INNER JOIN AGE B
ON A.ID == B.ID)
WHERE AGE >20