13

I need a JDBC sink for my spark structured streaming data frame. At the moment, as far as I know DataFrame’s API lacks writeStream to JDBC implementation (neither in PySpark nor in Scala (current Spark version 2.2.0)). The only suggestion I found was to write my own ForeachWriter Scala class based on this article.

So, I've modified a simple word count example from here by adding a custom ForeachWriterclass and tried to writeStream to PostgreSQL. Stream of words is generated manually from console (using NetCat: nc -lk -p 9999) and read by Spark from socket.

Unfortunately, I'm getting "Task not serializable" ERROR.

APACHE_SPARK_VERSION=2.1.0 Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)

My Scala code:

//Spark context available as 'sc' (master = local[*], app id = local-1501242382770). //Spark session available as 'spark'. import java.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession val spark = SparkSession .builder .master("local[*]") .appName("StructuredNetworkWordCountToJDBC") .config("spark.jars", "/tmp/data/postgresql-42.1.1.jar") .getOrCreate() import spark.implicits._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() val words = lines.as[String].flatMap(_.split(" ")) val wordCounts = words.groupBy("value").count() class JDBCSink(url: String, user:String, pwd:String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]{ val driver = "org.postgresql.Driver" var connection:java.sql.Connection = _ var statement:java.sql.Statement = _ def open(partitionId: Long, version: Long):Boolean = { Class.forName(driver) connection = java.sql.DriverManager.getConnection(url, user, pwd) statement = connection.createStatement true } def process(value: org.apache.spark.sql.Row): Unit = { statement.executeUpdate("INSERT INTO public.test(col1, col2) " + "VALUES ('" + value(0) + "'," + value(1) + ");") } def close(errorOrNull:Throwable):Unit = { connection.close } } val url="jdbc:postgresql://<mypostgreserver>:<port>/<mydb>" val user="<user name>" val pwd="<pass>" val writer = new JDBCSink(url, user, pwd) import org.apache.spark.sql.streaming.ProcessingTime val query=wordCounts .writeStream .foreach(writer) .outputMode("complete") .trigger(ProcessingTime("25 seconds")) .start() query.awaitTermination() 

Error message:

ERROR StreamExecution: Query [id = ef2e7a4c-0d64-4cad-ad4f-91d349f8575b, runId = a86902e6-d168-49d1-b7e7-084ce503ea68] terminated with error org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923) at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:503) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177) Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.streaming.StreamExecution Serialization stack: - object not serializable (class: org.apache.spark.sql.execution.streaming.StreamExecution, value: Streaming Query [id = 9b01db99-9120-4047-b779-2e2e0b289f65, runId = e20beefa-146a-4139-96f9-de3d64ce048a] [state = TERMINATED]) - field (class: $line21.$read$$iw$$iw, name: query, type: interface org.apache.spark.sql.streaming.StreamingQuery) - object (class $line21.$read$$iw$$iw, $line21.$read$$iw$$iw@24747e0f) - field (class: $line21.$read$$iw, name: $iw, type: class $line21.$read$$iw$$iw) - object (class $line21.$read$$iw, $line21.$read$$iw@1814ed19) - field (class: $line21.$read, name: $iw, type: class $line21.$read$$iw) - object (class $line21.$read, $line21.$read@13e62f5d) - field (class: $line25.$read$$iw, name: $line21$read, type: class $line21.$read) - object (class $line25.$read$$iw, $line25.$read$$iw@14240e5c) - field (class: $line25.$read$$iw$$iw, name: $outer, type: class $line25.$read$$iw) - object (class $line25.$read$$iw$$iw, $line25.$read$$iw$$iw@11e4c6f5) - field (class: $line25.$read$$iw$$iw$JDBCSink, name: $outer, type: class $line25.$read$$iw$$iw) - object (class $line25.$read$$iw$$iw$JDBCSink, $line25.$read$$iw$$iw$JDBCSink@6c096c84) - field (class: org.apache.spark.sql.execution.streaming.ForeachSink, name: org$apache$spark$sql$execution$streaming$ForeachSink$$writer, type: class org.apache.spark.sql.ForeachWriter) - object (class org.apache.spark.sql.execution.streaming.ForeachSink, org.apache.spark.sql.execution.streaming.ForeachSink@6feccb75) - field (class: org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1, name: $outer, type: class org.apache.spark.sql.execution.streaming.ForeachSink) - object (class org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 25 more 

How to make it work?

SOLUTION

(Thanks to all, special thaks to @zsxwing for a straightforward solution):

  1. Save JDBCSink class to a file.
  2. In spark-shell load a class f.eg. using scala> :load <path_to_a_JDBCSink.scala_file>
  3. Finally scala> :paste code without JDBCSink class definition.
1
  • can u please tell how to can i insert data to hdfs or hive after grouping Commented Jan 9, 2019 at 14:28

3 Answers 3

9

Just define JDBCSink in a separated file rather than defining it as an inner class which may capture the outer reference.

Sign up to request clarification or add additional context in comments.

3 Comments

@Lukiz can u please tell how to can i insert data to hdfs or hive after grouping
@BigD I haven't tried to save it to hdfs or hive this way. But except from checking Spark docs, that you have probably done, i'd suggest you to look at kafka with apropriate connector: link. First sentence sounds promissing: "The HDFS connector allows you to export data from Kafka topics to HDFS files in a variety of formats and integrates with Hive to make data immediately available for querying with HiveQL." So if you can, add kafka to your processing architecture, Hope that helps. Regards.
actually my requirement is to fetch data from kafka using spark streaming nad joining data with one of the hive table and write it back to Hive... am not getting data wrote to the hdfs after aggregating .... can you share your thoughts .. @Lukiz
2

In case somebody encounters this in an interactive workbook, this solution also works:

Instead of saving the JDBCSinkclass to a seperate file, you can also just declare it as a separate package ("Packaged cell") within the same workbook and import that package in the cell where you are using it. Well described here https://docs.databricks.com/user-guide/notebooks/package-cells.html

Comments

1

Looks like the offender here is the use of import spark.implicits._ inside the JDBCSink class:

  • JDBCSink must be serializable
  • By adding this import, you make your JDBCSink reference the non-serializable SparkSession which is then serialized along with it (techincally, SparkSession extends Serializable, but it's not meant to be deserialized on the worker nodes)

The good news: you're not using this import, so if you just remove it, this should work.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.