本节我们将使用yahoo finance 的数据,将Amazon历史五年的股价数据下载成csv ,然后使用Spark DataFrame进行分析。
Amazon历史的股价数据如下,其中包含开盘价(Open), 当日最高价(High), 最低价(Low), 收盘价(close), 调整后的收盘价(Adj Close), 成交量(Volume)
信息:
aws s3 mb s3://<bucket-name>
wget https://pingfan.s3.amazonaws.com/files/AMZN.csv
aws s3 cp AMZN.csv s3://<bucket-name>/files/
读取文件到DataFrame:
df = spark.read.csv("s3://<bucket-name>/files/AMZN.csv", header = True, inferSchema = True)
创建完成后,spark DataFrame自动解析出来csv的表结构:
describe
命令会统计每一列的最大值、最小值、平均值等指标:
上面在describe命令执行完成后,一些结果会保留到小数点后十几位,不便于阅读,我们可以将其保留成2位小数:
from pyspark.sql.functions import format_number
description = df.describe()
description.select(description['summary'],
format_number(description["Open"].cast("float"),2).alias("Open"),
format_number(description["Close"].cast("float"),2).alias("Close"),
description["Volume"].cast("int").alias("Close")
).show()
除法操作——为DataFrame新增一列,计算Open/Volume
:
new_df = df.withColumn("OV Ratio", df["open"]/df["Volume"])
new_df.show()
排序操作——按开盘价从大到小排列:
df.orderBy(df['Open'].desc()).show()
按开盘价从小到大排列:
df.orderBy(df['Open']).show()
mean方法用于计算平均值,max/min计算最大/最小值:
from pyspark.sql.functions import mean,max,min
df.select(mean("Open")).show()
df.select(max("Open")).show()
Head操作——只查看N行数据:
df.orderBy(df['Open']).head(2)