Spark项目 - ETL

使用spark-shell能够执行scala代码,在里面创建表结构:

image-20240310000059845

但为了代码开发高效,我们使用Zeppelin:

image-20240310000206560

在里面创建一个Spark类型的notebook:

image-20240309235551628

在里面执行以下代码:

val gameDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/game.csv")
gameDF.printSchema

image-20240310000339121

看到spark已经成功将数据加载到Data Frame并推断出来表的数据结构。

同理将剩下的表加载到DF:

val game_goalie_statsDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/game_goalie_stats.csv")

val game_playsDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/game_plays.csv")

val game_plays_playersDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/game_plays_players.csv")

val game_shiftsDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/game_shifts.csv")

val game_skater_statsDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/game_skater_stats.csv")

val game_teams_statsDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/game_teams_stats.csv")

val player_infoDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/player_info.csv")

val team_infoDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/team_info.csv")

image-20240310000928120

可以从中抽查一些schema和数据:

image-20240310000922693

使用cache将Dataset持久化到内存中,以便重用,并提高后续操作的性能:

gameDF.cache
game_goalie_statsDF.cache
game_playsDF.cache
game_plays_playersDF.cache
game_shiftsDF.cache
game_skater_statsDF.cache
game_teams_statsDF.cache
player_infoDF.cache
team_infoDF.cache

image-20240310001159486

创建临时view:

gameDF.createOrReplaceTempView("tempgame")
game_goalie_statsDF.createOrReplaceTempView("tempgame_goalie_stats")
game_playsDF.createOrReplaceTempView("tempgame_plays")
game_plays_playersDF.createOrReplaceTempView("tempgame_plays_players")
game_shiftsDF.createOrReplaceTempView("tempgame_shifts")
game_skater_statsDF.createOrReplaceTempView("tempgame_skater_stats")
game_teams_statsDF.createOrReplaceTempView("tempgame_teams_stats")
player_infoDF.createOrReplaceTempView("tempplayer_info")
team_infoDF.createOrReplaceTempView("tempteam_info")

image-20240310001303696

查看当前的temporary view:

image-20240310001500151

根据temporary view表,创建新的表(可以理解为是ETL过程):

spark.sql("drop database nhl cascade")
spark.sql("create database nhl")
spark.sql("Create table nhl.game as select * from tempgame")
spark.sql("Create table nhl.game_goalie_stats as select * from tempgame_goalie_stats")
spark.sql("Create table nhl.game_plays as select * from tempgame_plays")
spark.sql("Create table nhl.game_plays_players as select * from tempgame_plays_players")
spark.sql("Create table nhl.game_shifts as select * from tempgame_shifts")
spark.sql("Create table nhl.game_skater_stats as select * from tempgame_skater_stats")
spark.sql("Create table nhl.game_teams_stats as select * from tempgame_teams_stats")
spark.sql("Create table nhl.player_info as select * from tempplayer_info")
spark.sql("Create table nhl.team_info as select * from tempteam_info")

整个ETL过程耗时1分24s:

image-20240310003532827

执行完成后,查看HDFS目录,spark创建了新的目录:

image-20240310002834638

nhl.db里面数据不再是csv格式,而是parquet:

image-20240310003011387

在AWS生态体系中,可以将所有表的存储都放在S3上,实现数据湖