I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.
My program:
- Simulates the streaming arrival of files to the folder "monitoring_dir" (one new file is transferred from "source_dir" each 10 seconds).
- Uses a DataStreamReader to populate the Unbounded DataFrame "inputUDF" with the content of each new file.
- Uses a DataStreamWriter to output the new rows of "inputUDF" to a valid sink.
Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.
Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.
My question is:
- How can I make this program to output to Console sink and display the results when using Databricks?
Thank you very much in advance!
Best regards, Nacho
My Program: myTest.py
import pyspark import pyspark.sql.functions import time #------------------------------------ # FUNCTION get_source_dir_file_names #------------------------------------ def get_source_dir_file_names(source_dir): # 1. We create the output variable res = [] # 2. We get the FileInfo representation of the files of source_dir fileInfo_objects = dbutils.fs.ls(source_dir) # 3. We traverse the fileInfo objects, to get the name of each file for item in fileInfo_objects: # 3.1. We get a string representation of the fileInfo file_name = str(item) # 3.2. We look for the pattern name= to remove all useless info from the start lb_index = file_name.index("name='") file_name = file_name[(lb_index + 6):] # 3.3. We look for the pattern ') to remove all useless info from the end ub_index = file_name.index("',") file_name = file_name[:ub_index] # 3.4. We append the name to the list res.append(file_name) # 4. We sort the list in alphabetic order res.sort() # 5. We return res return res #------------------------------------ # FUNCTION streaming_simulation #------------------------------------ def streaming_simulation(source_dir, monitoring_dir, time_step_interval): # 1. We get the names of the files on source_dir files = get_source_dir_file_names(source_dir) # 2. We get the starting time of the process time.sleep(time_step_interval * 0.1) start = time.time() # 3. We set a counter in the amount of files being transferred count = 0 # 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir # (i.e, the files are moved one by one for each time period, simulating their generation). for file in files: # 4.1. We copy the file from source_dir to dataset_dir# dbutils.fs.cp(source_dir + file, monitoring_dir + file) # 4.2. We increase the counter, as we have transferred a new file count = count + 1 # 4.3. We wait the desired transfer_interval until next time slot. time.sleep((start + (count * time_step_interval)) - time.time()) # 5. We wait a last time_step_interval time.sleep(time_step_interval) #------------------------------------ # FUNCTION my_main #------------------------------------ def my_main(): # 0. We set the mode console_sink = True # 1. We set the paths to the folders source_dir = "/FileStore/tables/my_dataset/" monitoring_dir = "/FileStore/tables/my_monitoring/" checkpoint_dir = "/FileStore/tables/my_checkpoint/" result_dir = "/FileStore/tables/my_result/" dbutils.fs.rm(monitoring_dir, True) dbutils.fs.rm(result_dir, True) dbutils.fs.rm(checkpoint_dir, True) dbutils.fs.mkdirs(monitoring_dir) dbutils.fs.mkdirs(result_dir) dbutils.fs.mkdirs(checkpoint_dir) # 2. We configure the Spark Session spark = pyspark.sql.SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel('WARN') # 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir inputUDF = spark.readStream.format("text")\ .load(monitoring_dir) myDSW = None # 4. Operation A1: We create the DataStreamWritter... # 4.1. To either save to result_dir in append mode if console_sink == False: myDSW = inputUDF.writeStream.format("text")\ .option("path", result_dir) \ .option("checkpointLocation", checkpoint_dir)\ .trigger(processingTime="10 seconds")\ .outputMode("append") # 4.2. Or to display by console in append mode else: myDSW = inputUDF.writeStream.format("console")\ .trigger(processingTime="10 seconds")\ .outputMode("append") # 5. We get the StreamingQuery object derived from starting the DataStreamWriter mySQ = myDSW.start() # 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir streaming_simulation(source_dir, monitoring_dir, 10) # 7. We stop the StreamingQuery to finish the application mySQ.stop() #------------------------------- # MAIN ENTRY POINT #-------------------------------strong text if __name__ == '__main__': my_main() My Dataset: f1.txt
First sentence.
Second sentence.
My Dataset: f2.txt
Third sentence.
Fourth sentence.
My Dataset: f3.txt
Fifth sentence.
Sixth sentence.