0

I try create DataFrame from Hive table. But I bad work with Spark API.

I need help to optimize the query in method getLastSession, make two tasks into one task for spark:

val pathTable = new File("/src/test/spark-warehouse/test_db.db/test_table").getAbsolutePath val path = new Path(s"$pathTable${if(onlyPartition) s"/name_process=$processName" else ""}").toString val df = spark.read.parquet(path) def getLastSession: Dataset[Row] = { val lastTime = df.select(max(col("time_write"))).collect()(0)(0).toString val lastSession = df.select(col("id_session")).where(col("time_write") === lastTime).collect()(0)(0).toString val dfByLastSession = df.filter(col("id_session") === lastSession) dfByLastSession.show() /* +----------+----------------+------------------+-------+ |id_session| time_write| key| value| +----------+----------------+------------------+-------+ |alskdfksjd|1639950466414000|schema2.table2.csv|Failure| */ dfByLastSession } 

PS. My Source Table (for example):

name_process id_session time_write key value
OtherClass jsdfsadfsf 43434883477 schema0.table0.csv Success
OtherClass jksdfkjhka 23212123323 schema1.table1.csv Success
OtherClass alskdfksjd 23343212234 schema2.table2.csv Failure
ExternalClass sdfjkhsdfd 34455453434 schema3.table3.csv Success
4
  • @blackbishop, No. I'm trying to remove the "lastTime" and "lastSession" variables. So that all operations with the DataFrame occur in one approach in the variable "dfByLastSession" Commented Dec 22, 2021 at 12:26
  • My current method working correctly. I just try optimize query for spark. Commented Dec 22, 2021 at 12:29
  • You want to get the all rows corresponding to the id_session having most recent time_write, true? Commented Dec 22, 2021 at 12:46
  • @blackbishop, yes, is this true! Commented Dec 22, 2021 at 13:15

1 Answer 1

2

You can use row_number with Window like this:

import org.apache.spark.sql.expressions.Window val dfByLastSession = df.withColumn( "rn", row_number().over(Window.orderBy(desc("time_write"))) ).filter("rn=1").drop("rn") dfByLastSession.show() 

However, as you do not partition by any field maybe it can degrade performances.

Another thing you can change in your code, is using struct ordering to get the id_session associated with most recent time_write with one query:

val lastSession = df.select(max(struct(col("time_write"), col("id_session")))("id_session")).first.getString(0) val dfByLastSession = df.filter(col("id_session") === lastSession) 
Sign up to request clarification or add additional context in comments.

1 Comment

It looks very good. Thank you for helping me. I will definitely get acquainted with the windows in spark.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.