RDD(Resilient Distributed Datasets
)之前的计算框架没有很好地复用计算阶段之间的数据。例如MapReduce框架计算完一个阶段的数据将结果输出到外部的静态储存系统,这造成数据复制、磁盘I/O、序列化的开销,并且占据了程序运行的大部分时间。而且这些框架的容错是通过不同机器之间同步数据/更新日志来实现的。对于数据密集型的应用,这些数据同步的操作使用更多的网络带宽,速度远慢于内存读取。
RDD是一个只读的、分片的记录集合。创建RDD的操作被称为Transformation
,例如map
, filter
。
RDD通过SparkContext
创建,当我们以sc.textFile("hdfs://path/to/file")
形式生成RDD时,spark就已经算好了数据的各个切片(也叫分区),并把分区信息放在了一个列表(名单)里,这个名单就属于RDD自带的其中一个属性。所以RDD不包含实际要处理的数据,而是在RDD中的分区名单中载明切片的信息。因为数据已经在Hadoop的数据节点上了, 只要在RDD中标明分区对应的数据所在位置、偏移量、数据长度即可,就类似元数据。
RDD在被分发到每个执行计算的任务节点后,每个任务节点会根据元数据信息获取自身节点负责计算的分区数据,并把数据放到本节点的内存当中,然后对数据进行计算。每个分区由一个节点来计算,换句话说就是每个任务只计算RDD的其中一个分区。
DataFrame是一个以命名列方式组织的分布式数据集。在概念上,它跟关系型数据库中的一张表或者Python中的DataFrame一样,但是比他们更优化。
我们可以从不同的数据源构建DataFrame,例如结构化数据文件、Hive中的表、外部数据库或现有的RDDs。DataFrame的API可以在各种语言中使用,包括Scala、Java、Python和R。
总结:
[Person]
虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。登录到上一节创建的spark集群,在examples/src/main/resources
目录下,有一些sample数据文件:
我们将加载该路径下的people.csv
到Spark的DataFrame。 进入bin目录,运行spark-shell
, 将该文件加载:
val people = spark.read.option("inferSchema","true").option("header","true").option("delimiter", ";").csv("/home/ec2-user/spark-3.5.0-bin-hadoop3/examples/src/main/resources/people.csv")
返回一个DataFrame对象:
可以对这个DataFrame对象执行各种操作:
people.count
people.printSchema
people.createOrReplaceTempView("people")
spark.sql("select * from people").show()
上面是scala的写法,如果用pyspark
,语法有所差异但基本类似:
retailDF = spark.read.option("inferSchema","true").option("header","true").option("delimiter", ";").csv("/home/ec2-user/spark-3.5.0-bin-hadoop3/examples/src/main/resources/people.csv")
retailDF.createOrReplaceTempView("people")
spark.sql("select * from people").show()
DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。
Dataframe是Dataset的特列,DataFrame=Dataset[Row]
,所以可以通过as方法将Dataframe转换为Dataset。
DataFrame 一行记录中没有指定特定的数据类型
Dataset 一行记录中每个对象有明确类型
DataSet在DataFrame基础上支持更强的数据类型控制,更精细化操作数据,避免因数据类型异常,造成的程序运行异常。
在bin下运行spark-shell
,加载数据,并将DataFrame转换成DataSet:
val peopleDF = spark.read.option("inferSchema","true").option("header","true").option("delimiter", ";").csv("/home/ec2-user/spark-3.5.0-bin-hadoop3/examples/src/main/resources/people.csv")
case class PeopleRow(name: String,age: Integer,job: String)
var peopleDS = peopleDF.as[PeopleRow]
peopleDF.take(2)
peopleDS.take(2)