1

I am reading stream of data from kafka topic using strucured streaming with Update Mode., and then doing some transformation.

Then I have created a jdbc sink to push the data in mysql sink with Append mode. The problem is how do I tell my sink to let it know that this is my primary key and do the update based on it so that my table should not have any duplicate rows.

 val df: DataFrame = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "<List-here>") .option("subscribe", "emp-topic") .load() import spark.implicits._ // value in kafka is bytes so cast it to String val empList: Dataset[Employee] = df. selectExpr("CAST(value AS STRING)") .map(row => Employee(row.getString(0))) // window aggregations on 1 min windows val aggregatedDf= ...... // How to tell here that id is my primary key and do the update // based on id column aggregatedDf .writeStream .trigger(Trigger.ProcessingTime(60.seconds)) .outputMode(OutputMode.Update) .foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF .select("id", "name","salary","dept") .write.format("jdbc") .option("url", "jdbc:mysql://localhost/empDb") .option("driver","com.mysql.cj.jdbc.Driver") .option("dbtable", "empDf") .option("user", "root") .option("password", "root") .mode(SaveMode.Append) .save() } 
0

1 Answer 1

3

One way is, you can use ON DUPLICATE KEY UPDATE with foreachPartition may serve this purpose

Below is the psuedo code snippet

/** * Insert in to database using foreach partition. * @param dataframe : DataFrame * @param sqlDatabaseConnectionString * @param sqlTableName */ def insertToTable(dataframe: DataFrame, sqlDatabaseConnectionString: String, sqlTableName: String): Unit = { //numPartitions = number of simultaneous DB connections you can planning to give datframe.repartition(numofpartitionsyouwant) val tableHeader: String = dataFrame.columns.mkString(",") dataFrame.foreachPartition { partition => // Note : Each partition one connection (more better way is to use connection pools) val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString) //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql partition.grouped(1000).foreach { group => val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder() group.foreach { record => insertString.append("('" + record.mkString(",") + "'),") } val sql = s""" | INSERT INTO $sqlTableName VALUES | $tableHeader | ${insertString} | ON DUPLICATE KEY UPDATE | yourprimarykeycolumn='${record.getAs[String]("key")}' sqlExecutorConnection.createStatement() .executeUpdate(sql) } sqlExecutorConnection.close() // close the connection } } 

you can use preparedstatement instead of jdbc statement.

Further reading : SPARK SQL - update MySql table using DataFrames and JDBC

Sign up to request clarification or add additional context in comments.

3 Comments

thanks for the answer. But that question you referred seems 3 years ago, so I am wondering if there is any other feature has to be in the current version of spark 2.4.0.
above approach will work with current version of spark as well since its RDD level
As of now I am marking the answer as accepted, as there is no other good alternative I found in spark.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.