RDD、DataFrame、DataSet

RDD

RDD(Resilient Distributed Datasets)之前的计算框架没有很好地复用计算阶段之间的数据。例如MapReduce框架计算完一个阶段的数据将结果输出到外部的静态储存系统,这造成数据复制、磁盘I/O、序列化的开销,并且占据了程序运行的大部分时间。而且这些框架的容错是通过不同机器之间同步数据/更新日志来实现的。对于数据密集型的应用,这些数据同步的操作使用更多的网络带宽,速度远慢于内存读取。

RDD是一个只读的、分片的记录集合。创建RDD的操作被称为Transformation,例如map, filter

RDD通过SparkContext创建,当我们以sc.textFile("hdfs://path/to/file")形式生成RDD时,spark就已经算好了数据的各个切片(也叫分区),并把分区信息放在了一个列表(名单)里,这个名单就属于RDD自带的其中一个属性。所以RDD不包含实际要处理的数据,而是在RDD中的分区名单中载明切片的信息。因为数据已经在Hadoop的数据节点上了, 只要在RDD中标明分区对应的数据所在位置、偏移量、数据长度即可,就类似元数据。

RDD在被分发到每个执行计算的任务节点后,每个任务节点会根据元数据信息获取自身节点负责计算的分区数据,并把数据放到本节点的内存当中,然后对数据进行计算。每个分区由一个节点来计算,换句话说就是每个任务只计算RDD的其中一个分区。

image-20240303105127197

DataFrame

DataFrame是一个以命名列方式组织的分布式数据集。在概念上,它跟关系型数据库中的一张表或者Python中的DataFrame一样,但是比他们更优化。

我们可以从不同的数据源构建DataFrame,例如结构化数据文件、Hive中的表、外部数据库或现有的RDDs。DataFrame的API可以在各种语言中使用,包括Scala、Java、Python和R。

总结:

  1. DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。
  2. DataFrame与RDD的主要区别在于,DataFrame带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
  3. 同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从 API 易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API 要更加友好,门槛更低。
  4. 左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。

DataFrame测试

登录到上一节创建的spark集群,在examples/src/main/resources目录下,有一些sample数据文件:

image-20240303111213837

我们将加载该路径下的people.csv到Spark的DataFrame。 进入bin目录,运行spark-shell, 将该文件加载:

val people = spark.read.option("inferSchema","true").option("header","true").option("delimiter", ";").csv("/home/ec2-user/spark-3.5.0-bin-hadoop3/examples/src/main/resources/people.csv")

返回一个DataFrame对象:

image-20240303114212260

可以对这个DataFrame对象执行各种操作:

people.count
people.printSchema

people.createOrReplaceTempView("people")
spark.sql("select * from people").show()

image-20240303114306597

上面是scala的写法,如果用pyspark,语法有所差异但基本类似:

retailDF = spark.read.option("inferSchema","true").option("header","true").option("delimiter", ";").csv("/home/ec2-user/spark-3.5.0-bin-hadoop3/examples/src/main/resources/people.csv")

retailDF.createOrReplaceTempView("people")
spark.sql("select * from people").show()

image-20240303114612838

DataSet

DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。

Dataframe是Dataset的特列,DataFrame=Dataset[Row] ,所以可以通过as方法将Dataframe转换为Dataset。

DataFrame和DataSet的区别

  • DataFrame 一行记录中没有指定特定的数据类型

  • Dataset 一行记录中每个对象有明确类型

  • DataSet在DataFrame基础上支持更强的数据类型控制,更精细化操作数据,避免因数据类型异常,造成的程序运行异常。

DataSet测试

在bin下运行spark-shell,加载数据,并将DataFrame转换成DataSet:

val peopleDF = spark.read.option("inferSchema","true").option("header","true").option("delimiter", ";").csv("/home/ec2-user/spark-3.5.0-bin-hadoop3/examples/src/main/resources/people.csv")

case class PeopleRow(name: String,age: Integer,job: String)

var peopleDS = peopleDF.as[PeopleRow]

peopleDF.take(2)
peopleDS.take(2)

image-20240303114452596