使用spark-shell能够执行scala代码,在里面创建表结构:
但为了代码开发高效,我们使用Zeppelin:
在里面创建一个Spark类型的notebook:
在里面执行以下代码:
val gameDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/game.csv")
gameDF.printSchema
看到spark已经成功将数据加载到Data Frame并推断出来表的数据结构。
同理将剩下的表加载到DF:
val game_goalie_statsDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/game_goalie_stats.csv")
val game_playsDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/game_plays.csv")
val game_plays_playersDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/game_plays_players.csv")
val game_shiftsDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/game_shifts.csv")
val game_skater_statsDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/game_skater_stats.csv")
val game_teams_statsDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/game_teams_stats.csv")
val player_infoDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/player_info.csv")
val team_infoDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/user/spark/warehouse/nhl-data/team_info.csv")
可以从中抽查一些schema和数据:
使用cache将Dataset持久化到内存中,以便重用,并提高后续操作的性能:
gameDF.cache
game_goalie_statsDF.cache
game_playsDF.cache
game_plays_playersDF.cache
game_shiftsDF.cache
game_skater_statsDF.cache
game_teams_statsDF.cache
player_infoDF.cache
team_infoDF.cache
创建临时view:
gameDF.createOrReplaceTempView("tempgame")
game_goalie_statsDF.createOrReplaceTempView("tempgame_goalie_stats")
game_playsDF.createOrReplaceTempView("tempgame_plays")
game_plays_playersDF.createOrReplaceTempView("tempgame_plays_players")
game_shiftsDF.createOrReplaceTempView("tempgame_shifts")
game_skater_statsDF.createOrReplaceTempView("tempgame_skater_stats")
game_teams_statsDF.createOrReplaceTempView("tempgame_teams_stats")
player_infoDF.createOrReplaceTempView("tempplayer_info")
team_infoDF.createOrReplaceTempView("tempteam_info")
查看当前的temporary view:
根据temporary view表,创建新的表(可以理解为是ETL过程):
spark.sql("drop database nhl cascade")
spark.sql("create database nhl")
spark.sql("Create table nhl.game as select * from tempgame")
spark.sql("Create table nhl.game_goalie_stats as select * from tempgame_goalie_stats")
spark.sql("Create table nhl.game_plays as select * from tempgame_plays")
spark.sql("Create table nhl.game_plays_players as select * from tempgame_plays_players")
spark.sql("Create table nhl.game_shifts as select * from tempgame_shifts")
spark.sql("Create table nhl.game_skater_stats as select * from tempgame_skater_stats")
spark.sql("Create table nhl.game_teams_stats as select * from tempgame_teams_stats")
spark.sql("Create table nhl.player_info as select * from tempplayer_info")
spark.sql("Create table nhl.team_info as select * from tempteam_info")
整个ETL过程耗时1分24s:
执行完成后,查看HDFS目录,spark创建了新的目录:
nhl.db
里面数据不再是csv格式,而是parquet:
在AWS生态体系中,可以将所有表的存储都放在S3上,实现数据湖