0

I have the following table:

DEST_COUNTRY_NAME ORIGIN_COUNTRY_NAME count United States Romania 15 United States Croatia 1 United States Ireland 344 Egypt United States 15 

The table is represented as a Dataset.

scala> dataDS res187: org.apache.spark.sql.Dataset[FlightData] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field] 

I am able to sort the entries as a batch process.

scala> dataDS.sort(col("count")).show(100); 

I now want to try if I can do the same using streaming. To do this, I suppose I will have to read the file as a stream.

scala> val staticSchema = dataDS.schema; staticSchema: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,IntegerType,true)) scala> val dataStream = spark. | readStream. | schema(staticSchema). | option("header","true"). | csv("data/flight-data/csv/2015-summary.csv"); dataStream: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field] scala> dataStream.isStreaming; res245: Boolean = true 

But I am not able to progress further w.r.t. how to read the data as a stream.

I have executed the sort transformation` process

scala> dataStream.sort(col("count")); res246: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field] 

I suppose now I should use Dataset's writeStream method. I ran the following two commands but both returned errors.

scala> dataStream.sort(col("count")).writeStream. | format("memory"). | queryName("sorted_data"). | outputMode("complete"). | start(); org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;; 

and this one

scala> dataStream.sort(col("count")).writeStream. | format("memory"). | queryName("sorted_data"). | outputMode("append"). | start(); org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;; 

From the errors, it seems I should be aggregating (group) data but I thought I don't need to do it as I can run any batch operation as a stream.

How can I understand how to sort data which arrives as a stream?

1 Answer 1

1

Unfortunately what the error messages tell you is accurate.

The point you make:

but I thought I don't need to do it as I can run any batch operation as a stream.

is not without merit, but it misses a fundamental point, that Structured Streaming is not tightly bound to micro-batching.

One could easily come up with some unscalable hack

import org.apache.spark.sql.functions._ dataStream .withColumn("time", window(current_timestamp, "5 minute")) // Some time window .withWatermark("time", "0 seconds") // Immediate watermark .groupBy("time") .agg(sort_array(collect_list(struct($"count", $"DEST_COUNTRY_NAME", $"ORIGIN_COUNTRY_NAME"))).as("data")) .withColumn("data", explode($"data")) .select($"data.*") .select(df.columns.map(col): _*) .writeStream .outputMode("append") ... .start() 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.