I work with the latest Structured Streaming in Apache Spark 2.2 and got the following exception:
org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;
Why does Complete output mode require a streaming aggregation? What would happen if Spark allowed Complete output mode with no aggregations in a streaming query?
scala> spark.version res0: String = 2.2.0 import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.SQLContext implicit val sqlContext: SQLContext = spark.sqlContext val source = MemoryStream[(Int, Int)] val ids = source.toDS.toDF("time", "id"). withColumn("time", $"time" cast "timestamp"). // <-- convert time column from Int to Timestamp dropDuplicates("id"). withColumn("time", $"time" cast "long") // <-- convert time column back from Timestamp to Int import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ scala> val q = ids. | writeStream. | format("memory"). | queryName("dups"). | outputMode(OutputMode.Complete). // <-- memory sink supports checkpointing for Complete output mode only | trigger(Trigger.ProcessingTime(30.seconds)). | option("checkpointLocation", "checkpoint-dir"). // <-- use checkpointing to save state between restarts | start org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;; Project [cast(time#10 as bigint) AS time#15L, id#6] +- Deduplicate [id#6], true +- Project [cast(time#5 as timestamp) AS time#10, id#6] +- Project [_1#2 AS time#5, _2#3 AS id#6] +- StreamingExecutionRelation MemoryStream[_1#2,_2#3], [_1#2, _2#3] at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:115) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:247) ... 57 elided