Spark读写MySQL

本节将介绍Spark读取MySQL中的数据,并将计算结果写回MySQL

Spark中读取MySQL数据可以有单分区和多分区,一般读取小数据量的表采用简单的单分区模式就可以,对于比较大的单分区抽取需要消耗时间较长的表来说,采用多分区模式读取性能会更好。

下载JAR包

搜索mysql connector,进入 https://www.mysql.com/products/connector/ 下载JDBC的MySQL Connector:

image-20240303185133686

选择Platform Independent,然后下载ZIP包:

image-20240303185209199

下载地址为 https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-j-8.3.0.zip。在Spark shell所在机器执行:

wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-j-8.3.0.zip
unzip mysql-connector-j-8.3.0.zip

解压后在目录中会有一个mysql-connector-j-8.3.0.jar文件。

MySQL创建数据表

创建一个MySQL数据库,方式可以选择以下几种:

  • 在AWS中创建一个RDS集群
  • 自建MySQL

创建完成后,登录到上面,创建数据库及数据表:

CREATE DATABASE books;
USE books;
CREATE TABLE authors (id INT, name VARCHAR(20), email VARCHAR(20));
INSERT INTO authors (id,name,email) VALUES(1,"Vivek","xuz@abc.com");
INSERT INTO authors (id,name,email) VALUES(2,"Priya","p@gmail.com");
INSERT INTO authors (id,name,email) VALUES(3,"Tom","tom@yahoo.com");

image-20240303190141186

Spark连接MySQL数据库并进行读

运行以下命令,使用--jars加载上面下载的包:

./bin/spark-shell --jars /home/ec2-user/spark-3.5.0-bin-hadoop3/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar

image-20240303190300450

运行以下命令,让spark连接到MySQL,并执行一些查询命令:

val dataframe_mysql = spark.read.format("jdbc").option("url", "jdbc:mysql://database-1.cluster-cpayj88cqtvo.us-west-2.rds.amazonaws.com:3306/books").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "authors").option("user", "admin").option("password", "your-password").option("useSSL", "false").load()

dataframe_mysql.show()


dataframe_mysql.createOrReplaceTempView("authors")

val count_sql = spark.sql("select count(*) as cnt from authors")

count_sql.show()

image-20240303191030250

写数据

count_sql.write.format("jdbc").option("url", "jdbc:mysql://database-1.cluster-cpayj88cqtvo.us-west-2.rds.amazonaws.com:3306/books").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "author_count").option("user", "admin").option("password", "your-password").mode("overwrite").option("useSSL", "false").save()

在MySQL中看到,Spark为它自动创建了一张表,并将结果写到里面:

image-20240303191242374

多分区并行读取

此种方式对于抽取数据量较大的表有很好的性能提升,但仅限于有连续数值型主键(比如自增id)的数据表, 使用numPartitions参数指定partition数量:

val dataframe_mysql = spark.read.format("jdbc").option("url", "jdbc:mysql://192.168.0.101:3306/employees").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "employees").option("user", "mysqluser").option("password", "mysqluser").option("useSSL", "false").option("numPartitions", "10").option("lowerBound", "10").option("upperBound", "10000").option("partitionColumn", "emp_no").load()

多分区并行读取时,必须指定以下四个参数:

  // 分区数量,可以理解为读取并行度、线程数
  .option("numPartitions", partitionNum)
  // 分区字段,必须为数字、日期、时间戳字段
  .option("partitionColumn", "id")
    
  // lowerBound 和 upperBound 仅用于计算每个分区的取数步长,不用于数据过滤
  // 分区字段的最小值
  .option("lowerBound", minId)
  // 分区字段的最大值
  .option("upperBound", maxId)