4

I am getting the following error when I run my spark job:

org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets;;

I am not sure if the issue is being caused due to lack of a watermark,which I don't know how to apply in this context. Following is the aggregation operation applied:

def aggregateByValue(): DataFrame = { df.withColumn("Value", expr("(BookingClass, Value)")) .groupBy("AirlineCode", "Origin", "Destination", "PoS", "TravelDate", "StartSaleDate", "EndSaleDate", "avsFlag") .agg(collect_list("Value").as("ValueSeq")) .drop("Value") } 

Usage:

val theGroupedDF = theDF .multiplyYieldByHundred .explodeDates .aggregateByValue val query = theGroupedDF.writeStream .outputMode("append") .format("console") .start() query.awaitTermination() 

2 Answers 2

8

Changing the outputMode to complete solved the issue.

val query = theGroupedDF.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination() 
Sign up to request clarification or add additional context in comments.

Comments

0

adding this would solve the problem:

 val theGroupedDF = theDF .multiplyYieldByHundred .explodeDates .aggregateByValue //code bellow .withColumn("timestamp", current_timestamp()) .withWatermark("timestamp", "10 minutes") 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.