一.IDEA装驱动:
1.下载一个MySQL的JDBC驱动:mysql-connector-java-5.1.44.tar.gz
2.在idea Open Moudle Settings 在 Moudle中 选Dependencies + JDC驱动的解压位置 选(mysql-connector-java-5.1.44-bin)这个就ok二.程序:
import java.util.Propertiesimport com.sun.org.apache.xalan.internal.xsltc.compiler.util.IntTypeimport org.apache.spark.sql.types._import org.apache.spark.sql.{Row, SparkSession}object JDBC_To_DF { val spark= SparkSession.builder().getOrCreate() import spark.implicits._ def main(args: Array[String]): Unit = { val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/***") //*****这是数据库名 .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "****")//*****是表名 .option("user", "*****").option("password", "*****").load() jdbcDF.show() val studentRDD=spark.sparkContext.parallelize(Array("3 小翠 G 27","4 小狗蛋 B 50")) .map(x=>x.split(" ")) val ROWRDD=studentRDD.map(x=>Row(x(0).toInt,x(1).trim,x(2).trim,x(3).toInt)) ROWRDD.foreach(print) //设置模式信息 val schema=StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("gender",StringType,true),StructField("age", IntegerType, true))) val studentDF=spark.createDataFrame(ROWRDD,schema) val parameter=new Properties() parameter.put("user","****") parameter.put("password","****") parameter.put("driver","com.mysql.jdbc.Driver") studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/******","/*/*/*/*/*",parameter) //******"是数据库名,/*/*/*/*/*表名 jdbcDF.show() }}
结果:
+---+----+------+---+
| id|name|gender|age|+---+----+------+---+| 1| 小周| B| 25|| 2| 小苏| G| 27|| 3| 小翠| G| 27|| 4| 小狗蛋| B| 50|| 3| 小翠| G| 27|| 4| 小狗蛋| B| 50|+---+----+------+---+