本节将介绍Spark读取MySQL中的数据,并将计算结果写回MySQL
Spark中读取MySQL数据可以有单分区和多分区,一般读取小数据量的表采用简单的单分区模式就可以,对于比较大的单分区抽取需要消耗时间较长的表来说,采用多分区模式读取性能会更好。
搜索mysql connector
,进入 https://www.mysql.com/products/connector/ 下载JDBC的MySQL Connector:
选择Platform Independent
,然后下载ZIP包:
下载地址为 https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-j-8.3.0.zip。在Spark shell所在机器执行:
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-j-8.3.0.zip
unzip mysql-connector-j-8.3.0.zip
解压后在目录中会有一个mysql-connector-j-8.3.0.jar
文件。
创建一个MySQL数据库,方式可以选择以下几种:
创建完成后,登录到上面,创建数据库及数据表:
CREATE DATABASE books;
USE books;
CREATE TABLE authors (id INT, name VARCHAR(20), email VARCHAR(20));
INSERT INTO authors (id,name,email) VALUES(1,"Vivek","xuz@abc.com");
INSERT INTO authors (id,name,email) VALUES(2,"Priya","p@gmail.com");
INSERT INTO authors (id,name,email) VALUES(3,"Tom","tom@yahoo.com");
运行以下命令,使用--jars
加载上面下载的包:
./bin/spark-shell --jars /home/ec2-user/spark-3.5.0-bin-hadoop3/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar
运行以下命令,让spark连接到MySQL,并执行一些查询命令:
val dataframe_mysql = spark.read.format("jdbc").option("url", "jdbc:mysql://database-1.cluster-cpayj88cqtvo.us-west-2.rds.amazonaws.com:3306/books").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "authors").option("user", "admin").option("password", "your-password").option("useSSL", "false").load()
dataframe_mysql.show()
dataframe_mysql.createOrReplaceTempView("authors")
val count_sql = spark.sql("select count(*) as cnt from authors")
count_sql.show()
count_sql.write.format("jdbc").option("url", "jdbc:mysql://database-1.cluster-cpayj88cqtvo.us-west-2.rds.amazonaws.com:3306/books").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "author_count").option("user", "admin").option("password", "your-password").mode("overwrite").option("useSSL", "false").save()
在MySQL中看到,Spark为它自动创建了一张表,并将结果写到里面:
此种方式对于抽取数据量较大的表有很好的性能提升,但仅限于有连续数值型主键(比如自增id)的数据表, 使用numPartitions
参数指定partition数量:
val dataframe_mysql = spark.read.format("jdbc").option("url", "jdbc:mysql://192.168.0.101:3306/employees").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "employees").option("user", "mysqluser").option("password", "mysqluser").option("useSSL", "false").option("numPartitions", "10").option("lowerBound", "10").option("upperBound", "10000").option("partitionColumn", "emp_no").load()
多分区并行读取时,必须指定以下四个参数:
// 分区数量,可以理解为读取并行度、线程数
.option("numPartitions", partitionNum)
// 分区字段,必须为数字、日期、时间戳字段
.option("partitionColumn", "id")
// lowerBound 和 upperBound 仅用于计算每个分区的取数步长,不用于数据过滤
// 分区字段的最小值
.option("lowerBound", minId)
// 分区字段的最大值
.option("upperBound", maxId)