2

I am using spark-sql-2.4.1v with Java 8. I need to calculate percentiles such as 25,75,90 for some given data.

Given source dataset:

 val df = Seq( (10, 20, 30, 40, 50), (100, 200, 300, 400, 500), (111, 222, 333, 444, 555), (1123, 2123, 3123, 4123, 5123), (1321, 2321, 3321, 4321, 5321) ).toDF("col_1", "col_2", "col_3", "col_4", "col_5") +-----+-----+-----+-----+-----+ |col_1|col_2|col_3|col_4|col_5| +-----+-----+-----+-----+-----+ |10 |20 |30 |40 |50 | |100 |200 |300 |400 |500 | |111 |222 |333 |444 |555 | |1123 |2123 |3123 |4123 |5123 | |1321 |2321 |3321 |4321 |5321 | +-----+-----+-----+-----+-----+ val columnsToCalculate = Seq("col_2","col_3","col_4") 

Expected output :

+------+-----+-----+ |column|count|mean | +------+-----+-----+ |col_2 | 5 |some-z| |col_3 | 5 |some-y| |col_4 | 5 |some-x| +------+-----+-----+ 
4

2 Answers 2

2

Good question. I solved this but may be lacking in skills here. I think there is a fold solution, but I present a data wrangling approach. fold in Scala cannot be executed in parallel, so this approach should be faster.

Also, I do this in Scala, but this How to pivot Spark DataFrame? may help you to convert.

I am interested in better solutions. The dynamic columns list presents some issues but I continued in that vein and got to this solution:

import org.apache.spark.sql.functions._ // Add any other imports. // Gen data. val df = Seq( (10, 20, 30, 40, 50), (100, 200, 300, 400, 500), (111, 222, 333, 444, 555), (1123, 2123, 3123, 4123, 5123), (1321, 2321, 3321, 4321, 5321) ).toDF("col_1", "col_2", "col_3", "col_4", "col_5") // List approach of data to apply aggregates against. val columnsToCalculate = Seq("col_2","col_3","col_4") // Apply your aggregate and indicate what the metric is - individually. Could not get multiple calc with the .map approach here. Expand accordingly. val df1 = df.select(columnsToCalculate.map(c => mean(col(c)).alias(c)): _*).withColumn("stat", lit("mean")) val df2 = df.select(columnsToCalculate.map(c => min(col(c)).alias(c)): _*).withColumn("stat", lit("min")) val df3 = df1.union(df2) // Data wrangling, make an array for exploding. val df4 = df3.withColumn("CombinedArray", array(columnsToCalculate.map{ colName => regexp_replace(regexp_replace(df1(colName),"(^)",s"$colName: "),"(-)",s", $colName: ")}:_*)) val df5 = df4.select($"stat", explode($"CombinedArray")) val df6 = df5.withColumn("split", split(col("col"), ":")).select($"stat", col("split")(0).as("col_name"), col("split")(1).as("metric_value")) // Final data wrangling. val res = df6.groupBy($"col_name") .pivot($"stat") .agg(first($"metric_value")) .orderBy($"col_name") res.show(false) 

returns:

+--------+-------+-----+ |col_name|mean |min | +--------+-------+-----+ |col_2 | 977.2 | 20.0| |col_3 | 1421.4| 30.0| |col_4 | 1865.6| 40.0| +--------+-------+-----+ 

BTW: I could not place your count aspect.

Note: As the other answer states, may be you just wanted a describe?

Sign up to request clarification or add additional context in comments.

4 Comments

thanks a lot , i have another use case like this , any advice please stackoverflow.com/questions/63137437/…
Hi I am thinking how better to do
can you advice me on the same stackoverflow.com/questions/63450135/…
can you tell me what is wrong with this broadcast variable accessing ? stackoverflow.com/questions/64003697/…
1

there is a summary() api inside dataset which computes basicStats in the below format-

 ds.summary("count", "min", "25%", "75%", "max").show() // output: // summary age height // count 10.0 10.0 // min 18.0 163.0 // 25% 24.0 176.0 // 75% 32.0 180.0 // max 92.0 192.0 

Similarly, You can enrich the dataframe apis to get the stats in the format you required as below-

Define RichDataframe & implicits to use

 import org.apache.spark.sql.functions.expr import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.types.{NumericType, StringType, StructField, StructType} import scala.language.implicitConversions class RichDataFrame(ds: DataFrame) { def statSummary(statistics: String*): DataFrame = { val defaultStatistics = Seq("max", "min", "mean", "std", "skewness", "kurtosis") val statFunctions = if (statistics.nonEmpty) statistics else defaultStatistics val selectedCols = ds.schema .filter(a => a.dataType.isInstanceOf[NumericType] || a.dataType.isInstanceOf[StringType]) .map(_.name) val percentiles = statFunctions.filter(a => a.endsWith("%")).map { p => try { p.stripSuffix("%").toDouble / 100.0 } catch { case e: NumberFormatException => throw new IllegalArgumentException(s"Unable to parse $p as a percentile", e) } } require(percentiles.forall(p => p >= 0 && p <= 1), "Percentiles must be in the range [0, 1]") val aggExprs = selectedCols.flatMap(c => { var percentileIndex = 0 statFunctions.map { stats => if (stats.endsWith("%")) { val index = percentileIndex percentileIndex += 1 expr(s"cast(percentile_approx($c, array(${percentiles.mkString(", ")}))[$index] as string)") } else { expr(s"cast($stats($c) as string)") } } }) val aggResult = ds.select(aggExprs: _*).head() val r = aggResult.toSeq.grouped(statFunctions.length).toArray .zip(selectedCols) .map{case(seq, column) => column +: seq } .map(Row.fromSeq) val output = StructField("columns", StringType) +: statFunctions.map(c => StructField(c, StringType)) val spark = ds.sparkSession spark.createDataFrame(spark.sparkContext.parallelize(r), StructType(output)) } } object RichDataFrame { trait Enrichment { implicit def enrichMetadata(ds: DataFrame): RichDataFrame = new RichDataFrame(ds) } object implicits extends Enrichment } 

Test with the provided test data as below

 val df = Seq( (10, 20, 30, 40, 50), (100, 200, 300, 400, 500), (111, 222, 333, 444, 555), (1123, 2123, 3123, 4123, 5123), (1321, 2321, 3321, 4321, 5321) ).toDF("col_1", "col_2", "col_3", "col_4", "col_5") val columnsToCalculate = Seq("col_2","col_3","col_4") import com.som.spark.shared.RichDataFrame.implicits._ df.selectExpr(columnsToCalculate: _*) .statSummary("mean", "count", "25%", "75%", "90%") .show(false) /** * +-------+------+-----+---+----+----+ * |columns|mean |count|25%|75% |90% | * +-------+------+-----+---+----+----+ * |col_2 |977.2 |5 |200|2123|2321| * |col_3 |1421.4|5 |300|3123|3321| * |col_4 |1865.6|5 |400|4123|4321| * +-------+------+-----+---+----+----+ */ 

5 Comments

did not interprete this way, then there is also describe. you may well be correct
@Someshwar Kale, why here percentile_approx used , why not percentile ? i am looking for percentile value instead
@BdEngineer, if you wanted to use percentile then just replace percentile_approx with percentile in the statSummary method. the reason of using percentile_approx is because its faster than the percentile. More info- databricks.com/blog/2016/05/19/…
@Someshwar Kale thanks a lot , but when i tried df.stats.percentile it says it wont recognize "percentile" function ... but when do it on spark.sql( "select percentile(col("x"),0.0) as "pecentile_0") its working , confused ...so what is the different 2 ) what is the reason of declaring a RichDataframe in the above solution ?
RichDataframe has implicits to enrich current dataframe apis. when you specify df.statSummary("25%"), it call the method defined in RichDataframe. In that method "25%" uses percentile_approx (check this line expr(s"cast(percentile_approx($c, array(${percentiles.mkString(", ")}))[$index] as string)")). Now if you wanted to use percentile instead of percentile_approx just change the this -expr(s"cast(percentile_approx($c, array(${percentiles.mkString(", ")}))[$index] as string)") to expr(s"cast(percentile($c, array(${percentiles.mkString(", ")}))[$index] as string)").

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.