RDD操作

本节我们将介绍PySpark中RDD的操作


导入数据集至RDD

下载数据集并上传到S3:

wget https://pingfan.s3.amazonaws.com/files/retailstore.csv
aws s3 cp retailstore.csv s3://<bucket-name>/files/

image-20240307095206117

导入Spark依赖,使用parallelize()方法创建一个RDD测试:

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession \
       .builder \
       .appName("FirstApp") \
       .getOrCreate()

sc = spark.sparkContext

sampleRDD = sc.parallelize([1,2,3,4,5])
type(sampleRDD)
sampleRDD.collect()

image-20220604154447804

导入retailstore.csv文件至RDD:

customerData = sc.textFile("s3://emr-workshop-kpf/files/retailstore.csv")
type(customerData)

customerData.collect() # 输出所有记录
customerData.count() # 统计记录数量
customerData.first() # 输出第一行记录
customerData.take(3) # 输出前三行记录

image-20240307095333858

Transformation与Action

RDD是不可变更的(immutable), 但是可以对RDD做转换,并将结果保存到其他的RDD当中

Spark的操作有两种:

  • Transformation(根据已有的RDD,生成新的RDD - 如map、filter、flatMap等)
  • Action(返回RDD计算的结果 - 如collect、count、take操作)

Tranformation操作 - Map

Map是最基础的Transformation操作,它将某个函数作用到当前的RDD上,并生成新的RDD,形式如下:

newRDD = oldRDD.map(function)

新的RDD行数与原来的RDD一样多

例如我们将上面的customerData中所有的Male替换成M

customerData2 = customerData.map(lambda x: x.replace("Male", "M"))
customerData2.collect()

image-20220604155257354

Map的参数可以是lambda表达式,也可以是python中def定义的函数:

def replaceWord(word):
  return word.replace("Male", "M")
customerData3 = customerData.map(replaceWord)
customerData3.collect()

image-20220604155432566

Transformation操作 - Filter

和map操作一样,filter将某个函数作用到当前的RDD上,并生成新的RDD,形式如下:

newRDD = oldRDD.map(function)

例如将含有England关键字的记录筛选出来:

customerData3 = customerData.filter(lambda x: "England" in x)
customerData3.collect()

image-20220604155727382