DataFrame操作 - I

本节我们将使用yahoo finance 的数据,将Amazon历史五年的股价数据下载成csv ,然后使用Spark DataFrame进行分析。


Amazon历史的股价数据如下,其中包含开盘价(Open), 当日最高价(High), 最低价(Low), 收盘价(close), 调整后的收盘价(Adj Close), 成交量(Volume)信息:

image-20220603145145735

将csv数据上传到S3

aws s3 mb s3://<bucket-name>
wget https://pingfan.s3.amazonaws.com/files/AMZN.csv
aws s3 cp AMZN.csv s3://<bucket-name>/files/

image-20240307093856292

读取文件到DataFrame:

df = spark.read.csv("s3://<bucket-name>/files/AMZN.csv", header = True, inferSchema = True)

查看表结构及数据

创建完成后,spark DataFrame自动解析出来csv的表结构:

image-20240307094203666

describe命令会统计每一列的最大值、最小值、平均值等指标:

image-20220603145843474

DataFrame的格式化

上面在describe命令执行完成后,一些结果会保留到小数点后十几位,不便于阅读,我们可以将其保留成2位小数:

from pyspark.sql.functions import format_number

description = df.describe()
description.select(description['summary'],
                   format_number(description["Open"].cast("float"),2).alias("Open"),
                   format_number(description["Close"].cast("float"),2).alias("Close"),
                   description["Volume"].cast("int").alias("Close")
                   ).show()

image-20220604113813214

DataFrame的数学表达式操作

除法操作——为DataFrame新增一列,计算Open/Volume

new_df = df.withColumn("OV Ratio", df["open"]/df["Volume"])
new_df.show()

image-20220604114026089

排序操作——按开盘价从大到小排列:

df.orderBy(df['Open'].desc()).show()

image-20220604114209301

按开盘价从小到大排列:

df.orderBy(df['Open']).show()

image-20220604114302656


mean方法用于计算平均值,max/min计算最大/最小值:

from pyspark.sql.functions import mean,max,min

df.select(mean("Open")).show()
df.select(max("Open")).show()

image-20220604114659787

其他操作

Head操作——只查看N行数据:

df.orderBy(df['Open']).head(2)

image-20220604114417166