I've to do a calculation based on a JSON file located in a Azure Blob Storage folder. I'm working on Apache Spark on Azure HDInsight.
This folder has a number that it's related with the tracking order. If exist a number higher I've to read the JSON for this folder and discard the folder with lower numbers. For Example, If I have a folder with name 20200501-1 and 20200501-2, I must read 20200501-2.
The solution I've found in Apache Spark is reading the path, and add a column to the data frame as you can see below:
val visits = session.read.schema(schema).json(pathData).withColumn("path", input_file_name()) And with this path i make some transformation. But, this transformation involve a joins and groups by so, when I run the job with a large dataset in a cluster the Spark Job take a lot of time. Is there a possibility to make a different transformation? or improve my approach.
My transformation work with a dataframe (After add a column) like that:
val visits = Seq( ("ITEM4449", 33, "https://[email protected]/20200514-1/somename.json"), ("ITEM4450", 16, "https://[email protected]/20200514-1/somename.json"), ("ITEM1111", 88, "https://[email protected]/20200514-2/somename.json"), ("ITEM4453", 64, "https://[email protected]/20200514-1/somename.json"), ("ITEM1111", 12, "https://[email protected]/20200514-1/somename.json")). toDF("itemId", "visits", "path") I make this transformation:
def discardByTrackingCode(rawDataFrame: DataFrame): DataFrame = { val visitWithColumn = rawDataFrame. withColumn("tracking_version", expr("substring(path, 38, 1)")) val itemVersionDf = visitWithColumn. withColumn("item_version", concat(col("ItemId"), lit("_"), col("tracking_version"))) val versionToTakeDf = itemVersionDf. groupBy(col("ItemId").as("item_id_delete")). agg(max("item_version").as("item_version")) val itemReport = itemVersionDf.join(versionToTakeDf, Seq("item_version")) val finalDf = itemReport.select("ItemId", "Visits", "item_version") finalDf } And obtain the following data frame, which is correct:
+--------+------+------------+ |ItemId |Visits|item_version| +--------+------+------------+ |ITEM4449|33 |ITEM4449_1 | |ITEM4450|16 |ITEM4450_1 | |ITEM1111|88 |ITEM1111_2 | |ITEM4453|64 |ITEM4453_1 | +--------+------+------------+ There is a most efficient way to make this function work? Beside that. Is possible (or preferable) to looking for the folder using the Hadoop FileSystem class?
