本节我们将介绍如何使用Spark进行ETL任务。由于Spark在大数据处理能力上的高效率、及其简单易用的API、同时对各种数据库的访问都实现了良好的支持,使用Spark来开发搭建ETL成为一个不错的选择。
上一节我们在处理retailstore.csv
数据时,有些数据是不符合标准的,这在机器学习和大数据训练中经常遇到:
unknown
的情况, 需要将这种记录给过滤掉打开之前的Jupyter,创建SparkSession并加载数据:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("FirstApp") \
.getOrCreate()
df = spark.read.csv("s3://emr-workshop-kpf/files/retailstore.csv",header=True, inferSchema = True)
df.show()
使用filter
过滤掉unknown
类型的国家,
df1 = df.filter(df['Country'] != 'unknown')
df1.show()
新的DataFrame里已经没有对应记录:
计算Age和Salary的平均值:
from pyspark.sql.functions import mean
mean_age = df1.select(mean(df1['age'])).collect()
mean_age_val = mean_age[0][0] # mean_age是一个二维数组
mean_salary = df1.select(mean(df1['salary'])).collect()
mean_salary_val = mean_salary[0][0]
将null替换为对应的平均值:
df2 = df1.na.fill(mean_age_val, ["age"])
df3 = df2.na.fill(mean_salary_val, ['salary'])
df3.show()
新的DataFrame上已经是替换后的平均值:
将新的DataFrame保存为csv格式:
df3.write.format('csv').save('retailstore_transformed')
由于是在Pyspark环境,所以使用livy将文件写入到了hadoop里。
查看HDFS文件:
经过上述步骤,我们完成了:
上面我们在Jupyter环境下进行开发,当开发完成后,我们需要将ETL代码跑在生产环境。此时需要使用spark-submit
来提交。
登录到EMR集群, 在当前目录下将下面内容保存为etl.py
:
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean
spark = SparkSession \
.builder \
.appName("FirstApp") \
.getOrCreate()
df = spark.read.csv("data/retailstore.csv",header=True, inferSchema = True)
df1 = df.filter(df['Country'] != 'unknown')
mean_age = df1.select(mean(df1['age'])).collect()
mean_age_val = mean_age[0][0]
mean_salary = df1.select(mean(df1['salary'])).collect()
mean_salary_val = mean_salary[0][0]
df2 = df1.na.fill(mean_age_val, ["age"])
df3 = df2.na.fill(mean_salary_val, ['salary'])
df3.show()
df3.write.format("csv").save("retailstore_transformed")
然后提交任务:
运行完成后,在HDFS下生成了对应目录及文件:
kongpingfan@dev-m:~$ hadoop fs -ls
Found 3 items
drwxr-xr-x - kongpingfan hadoop 0 2022-06-04 12:26 .sparkStaging
drwxr-xr-x - kongpingfan hadoop 0 2022-06-04 09:57 data
drwxr-xr-x - kongpingfan hadoop 0 2022-06-04 12:24 retailstore_transformed
kongpingfan@dev-m:~$ hadoop fs -ls retailstore_transformed
Found 2 items
-rw-r--r-- 2 kongpingfan hadoop 0 2022-06-04 12:24 retailstore_transformed/_SUCCESS
-rw-r--r-- 2 kongpingfan hadoop 246 2022-06-04 12:24 retailstore_transformed/part-00000-80df57c9-9a0e-4bb4-8ae9-f922e8dd2c03-c000.csv
kongpingfan@dev-m:~$ hadoop fs -cat retailstore_transformed/part-00000-80df57c9-9a0e-4bb4-8ae9-f922e8dd2c03-c000.csv
18,20000,Male,Germany,N
19,22000,Female,France,N
20,24000,Female,England,N
21,31875,Male,England,N
22,50000,Male,France,Y
23,35000,Female,England,N
24,31875,Male,Germany,N
25,32000,Female,France,Y
22,35000,Male,Germany,N
27,37000,Female,France,N
进入到hive,此时我们可以查询经ETL处理后的数据:
create table retailcleaned(age int, salary int,gender string, country string, purchased string) row format delimited fields terminated by ',' location 'retailstore_transformed';
select * from retailcleaned;