I have the following table:
DEST_COUNTRY_NAME ORIGIN_COUNTRY_NAME count United States Romania 15 United States Croatia 1 United States Ireland 344 Egypt United States 15 The table is represented as a Dataset.
scala> dataDS res187: org.apache.spark.sql.Dataset[FlightData] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field] I am able to sort the entries as a batch process.
scala> dataDS.sort(col("count")).show(100); I now want to try if I can do the same using streaming. To do this, I suppose I will have to read the file as a stream.
scala> val staticSchema = dataDS.schema; staticSchema: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,IntegerType,true)) scala> val dataStream = spark. | readStream. | schema(staticSchema). | option("header","true"). | csv("data/flight-data/csv/2015-summary.csv"); dataStream: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field] scala> dataStream.isStreaming; res245: Boolean = true But I am not able to progress further w.r.t. how to read the data as a stream.
I have executed the sort transformation` process
scala> dataStream.sort(col("count")); res246: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field] I suppose now I should use Dataset's writeStream method. I ran the following two commands but both returned errors.
scala> dataStream.sort(col("count")).writeStream. | format("memory"). | queryName("sorted_data"). | outputMode("complete"). | start(); org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;; and this one
scala> dataStream.sort(col("count")).writeStream. | format("memory"). | queryName("sorted_data"). | outputMode("append"). | start(); org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;; From the errors, it seems I should be aggregating (group) data but I thought I don't need to do it as I can run any batch operation as a stream.
How can I understand how to sort data which arrives as a stream?