在EMR集群上运行PySpark

本节我们将介绍如何在Hadoop集群上提交Spark任务

在Hadoop集群上提交Spark任务有两种方式:

  1. 通过pyspark命令提交
  2. 编写好py文件,通过spark-submit命令提交

pyspark命令行提交任务

登录到第一章创建的EMR集群Primary Node,将retailstore.csv上传到HDFS中:

$ wget https://pingfan.s3.amazonaws.com/files/retailstore.csv

kongpingfan@dev1-m:~$ hadoop fs -ls
Found 1 items
drwxr-xr-x   - kongpingfan hadoop          0 2022-06-02 12:59 .sparkStaging

kongpingfan@dev1-m:~$ hadoop fs -mkdir /user/kongpingfan/
kongpingfan@dev1-m:~$ hadoop fs -mkdir /user/kongpingfan/data  # 新建目录
kongpingfan@dev1-m:~$ hadoop fs -ls
Found 2 items
drwxr-xr-x   - kongpingfan hadoop          0 2022-06-02 12:59 .sparkStaging
drwxr-xr-x   - kongpingfan hadoop          0 2022-06-02 13:09 data

kongpingfan@dev1-m:~$ hadoop fs -put retailstore.csv /user/kongpingfan/data
kongpingfan@dev1-m:~$ hadoop fs -ls /user/kongpingfan/data
Found 1 items
-rw-r--r--   2 kongpingfan hadoop        306 2022-06-02 13:11 /user/kongpingfan/data/retailstore.csv

上传完成后,运行pyspark命令。

此命令会自动创建好spark session, 只需要写后面的代码就行:

customerDF = spark.read.csv("data/retailstore.csv",header=True)  # 这是相对路径的写法,或者使用HDFS绝对路径:customerDF = spark.read.csv("/user/${username}/data/retailstore.csv",header=True)
customerDF.show()

image-20220604175946289

使用SQL提交任务:

customerDF.createOrReplaceTempView("customer")
new_results = spark.sql("select * from customer where age>22").show()

image-20220604175956458

spark-submit提交任务

上面的方式需要逐行运行,我们也可以将所有命令写到python文件中,然后使用spark-submit命令提交。

将下面内容保存为demo.py:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkDFDemo').getOrCreate()
customerDF = spark.read.csv("data/retailstore.csv",header=True)
customerDF.show()

customerDF.createOrReplaceTempView("customer")
new_results = spark.sql("select * from customer where age>22").show()

spark-submit运行:

image-20220604180130604