7

I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.

My program:

  • Simulates the streaming arrival of files to the folder "monitoring_dir" (one new file is transferred from "source_dir" each 10 seconds).
  • Uses a DataStreamReader to populate the Unbounded DataFrame "inputUDF" with the content of each new file.
  • Uses a DataStreamWriter to output the new rows of "inputUDF" to a valid sink.

Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.

Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.

My question is:

  • How can I make this program to output to Console sink and display the results when using Databricks?

Thank you very much in advance!

Best regards, Nacho


My Program: myTest.py

import pyspark import pyspark.sql.functions import time #------------------------------------ # FUNCTION get_source_dir_file_names #------------------------------------ def get_source_dir_file_names(source_dir): # 1. We create the output variable res = [] # 2. We get the FileInfo representation of the files of source_dir fileInfo_objects = dbutils.fs.ls(source_dir) # 3. We traverse the fileInfo objects, to get the name of each file for item in fileInfo_objects: # 3.1. We get a string representation of the fileInfo file_name = str(item) # 3.2. We look for the pattern name= to remove all useless info from the start lb_index = file_name.index("name='") file_name = file_name[(lb_index + 6):] # 3.3. We look for the pattern ') to remove all useless info from the end ub_index = file_name.index("',") file_name = file_name[:ub_index] # 3.4. We append the name to the list res.append(file_name) # 4. We sort the list in alphabetic order res.sort() # 5. We return res return res #------------------------------------ # FUNCTION streaming_simulation #------------------------------------ def streaming_simulation(source_dir, monitoring_dir, time_step_interval): # 1. We get the names of the files on source_dir files = get_source_dir_file_names(source_dir) # 2. We get the starting time of the process time.sleep(time_step_interval * 0.1) start = time.time() # 3. We set a counter in the amount of files being transferred count = 0 # 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir # (i.e, the files are moved one by one for each time period, simulating their generation). for file in files: # 4.1. We copy the file from source_dir to dataset_dir# dbutils.fs.cp(source_dir + file, monitoring_dir + file) # 4.2. We increase the counter, as we have transferred a new file count = count + 1 # 4.3. We wait the desired transfer_interval until next time slot. time.sleep((start + (count * time_step_interval)) - time.time()) # 5. We wait a last time_step_interval time.sleep(time_step_interval) #------------------------------------ # FUNCTION my_main #------------------------------------ def my_main(): # 0. We set the mode console_sink = True # 1. We set the paths to the folders source_dir = "/FileStore/tables/my_dataset/" monitoring_dir = "/FileStore/tables/my_monitoring/" checkpoint_dir = "/FileStore/tables/my_checkpoint/" result_dir = "/FileStore/tables/my_result/" dbutils.fs.rm(monitoring_dir, True) dbutils.fs.rm(result_dir, True) dbutils.fs.rm(checkpoint_dir, True) dbutils.fs.mkdirs(monitoring_dir) dbutils.fs.mkdirs(result_dir) dbutils.fs.mkdirs(checkpoint_dir) # 2. We configure the Spark Session spark = pyspark.sql.SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel('WARN') # 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir inputUDF = spark.readStream.format("text")\ .load(monitoring_dir) myDSW = None # 4. Operation A1: We create the DataStreamWritter... # 4.1. To either save to result_dir in append mode if console_sink == False: myDSW = inputUDF.writeStream.format("text")\ .option("path", result_dir) \ .option("checkpointLocation", checkpoint_dir)\ .trigger(processingTime="10 seconds")\ .outputMode("append") # 4.2. Or to display by console in append mode else: myDSW = inputUDF.writeStream.format("console")\ .trigger(processingTime="10 seconds")\ .outputMode("append") # 5. We get the StreamingQuery object derived from starting the DataStreamWriter mySQ = myDSW.start() # 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir streaming_simulation(source_dir, monitoring_dir, 10) # 7. We stop the StreamingQuery to finish the application mySQ.stop() #------------------------------- # MAIN ENTRY POINT #-------------------------------strong text if __name__ == '__main__': my_main() 

My Dataset: f1.txt

First sentence.

Second sentence.


My Dataset: f2.txt

Third sentence.

Fourth sentence.


My Dataset: f3.txt

Fifth sentence.

Sixth sentence.

2 Answers 2

4

"How can I make this program to output to Console sink and display the results when using Databricks?"

The easiest way is to use display which Databricks provides. You can use it as shown below:

# Cell 1 rateDf = (spark.readStream .format("rate") .option("rowsPerSecond", 1) .option("numPartitions", 1) .load()) # Cell 2 display(rateDf, streamName="rate_stream") 

The Console sink does not work in Databricks as you would expect it to work in your IDE or when submitting it to your cluster. Instead, you can use the memory format and query the data with an %sql query:

inputUDF.writeStream \ .format("memory") \ .trigger(processingTime = "10 seconds") \ .queryName("inputUDF_console") \ .outputMode("append") \ .start() 

In another Databricks Cell you can look into the data by querying the table as given in the queryName:

%sql select * from inputUDF_console 
Sign up to request clarification or add additional context in comments.

1 Comment

This has help me figure out how to convert a (Databricks) streaming dataframe into a static dataframe (for a specific purpose). Here is how I converted to a static dataframe:<br/> ` static_df = spark.sql("select * from inputUDF_console") static_df.count() `
1

just add this line after the section where you created the dataframe

display(dataframe) 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.