本节我们将介绍如何在Hadoop集群上提交Spark任务
在Hadoop集群上提交Spark任务有两种方式:
pyspark
命令提交py
文件,通过spark-submit
命令提交登录到第一章创建的EMR集群Primary Node,将retailstore.csv
上传到HDFS中:
$ wget https://pingfan.s3.amazonaws.com/files/retailstore.csv
kongpingfan@dev1-m:~$ hadoop fs -ls
Found 1 items
drwxr-xr-x - kongpingfan hadoop 0 2022-06-02 12:59 .sparkStaging
kongpingfan@dev1-m:~$ hadoop fs -mkdir /user/kongpingfan/
kongpingfan@dev1-m:~$ hadoop fs -mkdir /user/kongpingfan/data # 新建目录
kongpingfan@dev1-m:~$ hadoop fs -ls
Found 2 items
drwxr-xr-x - kongpingfan hadoop 0 2022-06-02 12:59 .sparkStaging
drwxr-xr-x - kongpingfan hadoop 0 2022-06-02 13:09 data
kongpingfan@dev1-m:~$ hadoop fs -put retailstore.csv /user/kongpingfan/data
kongpingfan@dev1-m:~$ hadoop fs -ls /user/kongpingfan/data
Found 1 items
-rw-r--r-- 2 kongpingfan hadoop 306 2022-06-02 13:11 /user/kongpingfan/data/retailstore.csv
上传完成后,运行pyspark
命令。
此命令会自动创建好spark session
, 只需要写后面的代码就行:
customerDF = spark.read.csv("data/retailstore.csv",header=True) # 这是相对路径的写法,或者使用HDFS绝对路径:customerDF = spark.read.csv("/user/${username}/data/retailstore.csv",header=True)
customerDF.show()
使用SQL提交任务:
customerDF.createOrReplaceTempView("customer")
new_results = spark.sql("select * from customer where age>22").show()
上面的方式需要逐行运行,我们也可以将所有命令写到python文件中,然后使用spark-submit
命令提交。
将下面内容保存为demo.py
:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkDFDemo').getOrCreate()
customerDF = spark.read.csv("data/retailstore.csv",header=True)
customerDF.show()
customerDF.createOrReplaceTempView("customer")
new_results = spark.sql("select * from customer where age>22").show()
spark-submit
运行: