Spark ETL

本节我们将介绍如何使用Spark进行ETL任务。由于Spark在大数据处理能力上的高效率、及其简单易用的API、同时对各种数据库的访问都实现了良好的支持,使用Spark来开发搭建ETL成为一个不错的选择。

上一节我们在处理retailstore.csv数据时,有些数据是不符合标准的,这在机器学习和大数据训练中经常遇到:

  1. Age和Salary有空值的情况,需要将其转换为该列的平均值
  2. Country有unknown的情况, 需要将这种记录给过滤掉

image-20220604194810904

使用Jupyter进行ETL代码开发

打开之前的Jupyter,创建SparkSession并加载数据:

from pyspark.sql import SparkSession

spark = SparkSession \
       .builder \
       .appName("FirstApp") \
       .getOrCreate()

df = spark.read.csv("s3://emr-workshop-kpf/files/retailstore.csv",header=True, inferSchema = True)
df.show()

image-20240307101208385

使用filter过滤掉unknown类型的国家,

df1 = df.filter(df['Country'] != 'unknown')
df1.show()

新的DataFrame里已经没有对应记录:

image-20220604195227682

计算Age和Salary的平均值:

from pyspark.sql.functions import mean

mean_age = df1.select(mean(df1['age'])).collect()
mean_age_val = mean_age[0][0]  # mean_age是一个二维数组

mean_salary = df1.select(mean(df1['salary'])).collect()
mean_salary_val = mean_salary[0][0]

image-20220604201925245

将null替换为对应的平均值:

df2 = df1.na.fill(mean_age_val, ["age"])
df3 = df2.na.fill(mean_salary_val, ['salary'])
df3.show()

新的DataFrame上已经是替换后的平均值:

image-20220604201950569

将新的DataFrame保存为csv格式:

df3.write.format('csv').save('retailstore_transformed')

由于是在Pyspark环境,所以使用livy将文件写入到了hadoop里。

查看HDFS文件:

image-20240307103035949

image-20240307102836455

经过上述步骤,我们完成了:

  1. 对数据进行过滤
  2. 对数据的空值替换成该列的平均值
  3. 将处理完成的数据保存成新的文件

使用spark-submit运行ETL任务

上面我们在Jupyter环境下进行开发,当开发完成后,我们需要将ETL代码跑在生产环境。此时需要使用spark-submit来提交。

登录到EMR集群, 在当前目录下将下面内容保存为etl.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import mean


spark = SparkSession \
       .builder \
       .appName("FirstApp") \
       .getOrCreate()

df = spark.read.csv("data/retailstore.csv",header=True, inferSchema = True)
df1 = df.filter(df['Country'] != 'unknown')

mean_age = df1.select(mean(df1['age'])).collect()
mean_age_val = mean_age[0][0]
mean_salary = df1.select(mean(df1['salary'])).collect()
mean_salary_val = mean_salary[0][0]

df2 = df1.na.fill(mean_age_val, ["age"])
df3 = df2.na.fill(mean_salary_val, ['salary'])
df3.show()
df3.write.format("csv").save("retailstore_transformed")

然后提交任务:

image-20220604202851160

运行完成后,在HDFS下生成了对应目录及文件:

kongpingfan@dev-m:~$ hadoop fs -ls 
Found 3 items
drwxr-xr-x   - kongpingfan hadoop          0 2022-06-04 12:26 .sparkStaging
drwxr-xr-x   - kongpingfan hadoop          0 2022-06-04 09:57 data
drwxr-xr-x   - kongpingfan hadoop          0 2022-06-04 12:24 retailstore_transformed

kongpingfan@dev-m:~$ hadoop fs -ls retailstore_transformed
Found 2 items
-rw-r--r--   2 kongpingfan hadoop          0 2022-06-04 12:24 retailstore_transformed/_SUCCESS
-rw-r--r--   2 kongpingfan hadoop        246 2022-06-04 12:24 retailstore_transformed/part-00000-80df57c9-9a0e-4bb4-8ae9-f922e8dd2c03-c000.csv

kongpingfan@dev-m:~$ hadoop fs -cat retailstore_transformed/part-00000-80df57c9-9a0e-4bb4-8ae9-f922e8dd2c03-c000.csv
18,20000,Male,Germany,N
19,22000,Female,France,N
20,24000,Female,England,N
21,31875,Male,England,N
22,50000,Male,France,Y
23,35000,Female,England,N
24,31875,Male,Germany,N
25,32000,Female,France,Y
22,35000,Male,Germany,N
27,37000,Female,France,N

进入到hive,此时我们可以查询经ETL处理后的数据:

create table retailcleaned(age int, salary int,gender string, country string, purchased string) row format delimited fields terminated by ',' location 'retailstore_transformed';

select * from retailcleaned;

image-20220604203403016