Chapter5 Spark SQL:结构化数据文件处理
5.1 认识Spark SQL
5.2 掌握DataFrame基础操作
5.2.1 创建DataFrame对象
1.结构化数据文件创建
from HDFS / Parquest创建: val dfUsers=sqlContext.read.load("user/root/sparkSql/...")
JSON: val dfPeople = sqlContext.read.format("json").load("...")
2.外部数据库创建
val url="jdbc:mysql://193.168.128.130/test"
val jdbcDF=sqlContext.read.format("jdbc").options(Map("url" -> url, "user" -> "root", "password" -> "root", "dbtable" -> "people")).load()
3.RDD创建
第一种方式,先定义case class,case class Person(name:String, age:Int);
再val data=sc.textFile("...").map(_.split(","));
再val people=data.map(p=>Person(p(0),p(1).trim.toInt)).toDF()
第二种方式,无法提前定义case class时,编程制定Schema将RDD转换城DF
4.Hive中的表创建
hiveContext.sql("use test");
val people=hiveContext.sql("select * from people")
5.2.2 DataFrame查看数据
函数或方法,例 movies.printSchema
printSchema,打印数据模式;
show,查看数据;
first/head/take/takeAsList,获取若干行数据;
collect/collectAsList,获取所有数据
5.2.3 DataFrame查询操作
1.将DF注册成临时表,peopleDF.registerTempTable("peopleTemTab");
用SQL语句查询,val personsRDD=sqlContext.sql("select ...").rdd;
personsRDD.collect
2.直接再DF对象上查询
1.条件查询
已有DF对象rating和user
(1) where
val userWhere=user.where("gender='F' and age=18")
(2) filter
val userFilter=user.filter("gender='F' and age=18")
2.查询指定字段的数据信息
(1) select
val userSelect=user.select(" ")
(2) selectExpr: 对制定字段特殊处理
val userSelectExpr=user.selectExpr(" ", "replace(gender) as sex", " ")
(3) col/apply
获取一个字段,返回Column类型
- limit
val userlimit=user.limit(3)
4.orderBy/sort
val userOrderBy=user.orderBy(desc(" "))
val userSort=user.sort(asc(" "))
5.groupBy
val userGroupBy=user.groupBy("gender")
返回GroupedData对象
常用方法:max(), min(), mean(), sum(), count()
val userGroupByCount=user.groupBy("gender").count
6.join
val dfJoin=user.join(rating, "userId"),根据userId字段连接rating, user
5.2.4 DataFrame输出操作
import spark.sql.SaveMode
people.save("...", "json", SaveMode.Overwrite)
将DF输出为表
people.saveAsTable(" ", SaveMode.Overwrite)