1

I've to do a calculation based on a JSON file located in a Azure Blob Storage folder. I'm working on Apache Spark on Azure HDInsight.

This folder has a number that it's related with the tracking order. If exist a number higher I've to read the JSON for this folder and discard the folder with lower numbers. For Example, If I have a folder with name 20200501-1 and 20200501-2, I must read 20200501-2.

The solution I've found in Apache Spark is reading the path, and add a column to the data frame as you can see below:

val visits = session.read.schema(schema).json(pathData).withColumn("path", input_file_name()) 

And with this path i make some transformation. But, this transformation involve a joins and groups by so, when I run the job with a large dataset in a cluster the Spark Job take a lot of time. Is there a possibility to make a different transformation? or improve my approach.

My transformation work with a dataframe (After add a column) like that:

 val visits = Seq( ("ITEM4449", 33, "https://[email protected]/20200514-1/somename.json"), ("ITEM4450", 16, "https://[email protected]/20200514-1/somename.json"), ("ITEM1111", 88, "https://[email protected]/20200514-2/somename.json"), ("ITEM4453", 64, "https://[email protected]/20200514-1/somename.json"), ("ITEM1111", 12, "https://[email protected]/20200514-1/somename.json")). toDF("itemId", "visits", "path") 

I make this transformation:

 def discardByTrackingCode(rawDataFrame: DataFrame): DataFrame = { val visitWithColumn = rawDataFrame. withColumn("tracking_version", expr("substring(path, 38, 1)")) val itemVersionDf = visitWithColumn. withColumn("item_version", concat(col("ItemId"), lit("_"), col("tracking_version"))) val versionToTakeDf = itemVersionDf. groupBy(col("ItemId").as("item_id_delete")). agg(max("item_version").as("item_version")) val itemReport = itemVersionDf.join(versionToTakeDf, Seq("item_version")) val finalDf = itemReport.select("ItemId", "Visits", "item_version") finalDf } 

And obtain the following data frame, which is correct:

+--------+------+------------+ |ItemId |Visits|item_version| +--------+------+------------+ |ITEM4449|33 |ITEM4449_1 | |ITEM4450|16 |ITEM4450_1 | |ITEM1111|88 |ITEM1111_2 | |ITEM4453|64 |ITEM4453_1 | +--------+------+------------+ 

There is a most efficient way to make this function work? Beside that. Is possible (or preferable) to looking for the folder using the Hadoop FileSystem class?

1
  • extra effort vs extra processing time is a trade-off. time is money. I would do this via linux and put into a directory via cp or symbolic links. it's pre-processing. but i am not sure i would even bother, that said the principle is correct. Commented Jun 11, 2020 at 6:54

1 Answer 1

3

You can try to use a Window expression:

import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val window = Window.partitionBy("itemidnumber").orderBy(desc("fileVersion")) val visits = Seq( ("ITEM4449", 33, "https://[email protected]/20200514-1/somename.json"), ("ITEM4450", 16, "https://[email protected]/20200514-1/somename.json"), ("ITEM1111", 88, "https://[email protected]/20200514-2/somename.json"), ("ITEM4453", 64, "https://[email protected]/20200514-1/somename.json"), ("ITEM1111", 12, "https://[email protected]/20200514-1/somename.json")) .toDF("itemId", "visits", "path") .withColumn("itemidnumber", expr("substring(itemId, 5, 4)")) .withColumn("fileversion", expr("substring(path, 38, 1)")) .withColumn("tracking_version", expr("concat(itemidnumber, substring(path, 38, 1))")) .withColumn("row_number", row_number.over(window)) .filter($"row_number" === 1) display(visits) 

Output:

Databricks Community Output

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.