23

I work with the latest Structured Streaming in Apache Spark 2.2 and got the following exception:

org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;

Why does Complete output mode require a streaming aggregation? What would happen if Spark allowed Complete output mode with no aggregations in a streaming query?

scala> spark.version res0: String = 2.2.0 import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.SQLContext implicit val sqlContext: SQLContext = spark.sqlContext val source = MemoryStream[(Int, Int)] val ids = source.toDS.toDF("time", "id"). withColumn("time", $"time" cast "timestamp"). // <-- convert time column from Int to Timestamp dropDuplicates("id"). withColumn("time", $"time" cast "long") // <-- convert time column back from Timestamp to Int import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ scala> val q = ids. | writeStream. | format("memory"). | queryName("dups"). | outputMode(OutputMode.Complete). // <-- memory sink supports checkpointing for Complete output mode only | trigger(Trigger.ProcessingTime(30.seconds)). | option("checkpointLocation", "checkpoint-dir"). // <-- use checkpointing to save state between restarts | start org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;; Project [cast(time#10 as bigint) AS time#15L, id#6] +- Deduplicate [id#6], true +- Project [cast(time#5 as timestamp) AS time#10, id#6] +- Project [_1#2 AS time#5, _2#3 AS id#6] +- StreamingExecutionRelation MemoryStream[_1#2,_2#3], [_1#2, _2#3] at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:115) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:247) ... 57 elided 
0

2 Answers 2

9

From the Structured Streaming Programming Guide - other queries (excluding aggregations, mapGroupsWithState and flatMapGroupsWithState):

Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.

To answer the question:

What would happen if Spark allowed Complete output mode with no aggregations in a streaming query?

Probably OOM.

The puzzling part is why dropDuplicates("id") is not marked as aggregation.

Sign up to request clarification or add additional context in comments.

Comments

6

I think the problem is the output mode. instead of using OutputMode.Complete, use OutputMode.Append as shown below.

scala> val q = ids .writeStream .format("memory") .queryName("dups") .outputMode(OutputMode.Append) .trigger(Trigger.ProcessingTime(30.seconds)) .option("checkpointLocation", "checkpoint-dir") .start 

3 Comments

thank you this helped me out. should be mentioned in the docs
But OutputMode.Append does a different thing to OutputMode.Complete
how is this answer getting upvotes ? append and complete are totally diffrenet modes

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.