本节我们将介绍PySpark中RDD的操作
下载数据集并上传到S3:
wget https://pingfan.s3.amazonaws.com/files/retailstore.csv
aws s3 cp retailstore.csv s3://<bucket-name>/files/
导入Spark依赖,使用parallelize()
方法创建一个RDD测试:
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("FirstApp") \
.getOrCreate()
sc = spark.sparkContext
sampleRDD = sc.parallelize([1,2,3,4,5])
type(sampleRDD)
sampleRDD.collect()
导入retailstore.csv
文件至RDD:
customerData = sc.textFile("s3://emr-workshop-kpf/files/retailstore.csv")
type(customerData)
customerData.collect() # 输出所有记录
customerData.count() # 统计记录数量
customerData.first() # 输出第一行记录
customerData.take(3) # 输出前三行记录
RDD是不可变更的(immutable
), 但是可以对RDD做转换,并将结果保存到其他的RDD当中
Spark的操作有两种:
Transformation
(根据已有的RDD,生成新的RDD - 如map、filter、flatMap
等)Action
(返回RDD计算的结果 - 如collect、count、take
操作)Map是最基础的Transformation
操作,它将某个函数作用到当前的RDD上,并生成新的RDD,形式如下:
newRDD = oldRDD.map(function)
新的RDD行数与原来的RDD一样多
例如我们将上面的customerData
中所有的Male替换成M
:
customerData2 = customerData.map(lambda x: x.replace("Male", "M"))
customerData2.collect()
Map的参数可以是lambda表达式,也可以是python中def
定义的函数:
def replaceWord(word):
return word.replace("Male", "M")
customerData3 = customerData.map(replaceWord)
customerData3.collect()
和map操作一样,filter
将某个函数作用到当前的RDD上,并生成新的RDD,形式如下:
newRDD = oldRDD.map(function)
例如将含有England
关键字的记录筛选出来:
customerData3 = customerData.filter(lambda x: "England" in x)
customerData3.collect()