2

I am processing a Spark DataFrame (DF) and need to add a column to it on the flow, from inside a call to mapPartitions:

// Don't worry about what 'widget' is or represents val rdd = df.mapPartitions { rows => addColIfNecessary(rows, widget) } 

Then:

def addColIfNecessary(rows : Iterator[Row], widget : Widget) : Iterator[Row] = { rows.foreach { row => if(widget.determineWhetherRowNeedsNewCol(row)) { // TODO: Add a new "fizz" column (of StringType) to the row val newVal : String = widget.getValueOfNewCol(row) row.addColumn("fizz", StringType, newVal) } } rows } 

This is obviously just pseudo-code, but conveys what I'm trying to do. Any ideas as to how I can actually implement it?

4
  • 1
    possible duplicate of stackoverflow.com/questions/33876155/… Commented Oct 25, 2016 at 14:54
  • Thanks @Shankar but in that question the column is added to a newly created DF, not an existing one from inside the map partitions function. So I would argue that this is either a different (standalone) question, or that at the very least I need to understand what I'm doing wrong in my approach that would then allow that question to be the solution for my problem as well. Thanks again! Commented Oct 25, 2016 at 15:06
  • 1
    Given that DF is a columnar format, it would be more advisable to conditionally add a value to a nillable column than to add a column to some Rows. Also, is there a specific need to do this within mapPartitions ? Commented Oct 25, 2016 at 15:18
  • Thanks @maasg (+1) if you could post even a pseudo-code example that would go a long way for me (I'm brand new to Spark and Scala). Also I don't think I need to do this from inside mapPartitions, thats just the code I've stitched together from consulting the Google Gods and Spark docs for the last few hours. Commented Oct 25, 2016 at 15:20

1 Answer 1

1

DataFrames are column-oriented structures, meaning that adding a column to some rows is not a good idea. Instead, you could leverage the support for nullable values in DataFrames and instead of adding an extra column, add an optional value to a Row based on some criteria.

An example: Let's take a DF of users and pages:

val users = Seq("Alice" , "Bob", "Charly", "Dean", "Eve", "Flor", "Greta") val pages = (1 to 9).map(i => s"page_$i") val userPages = for {u <- users p <- pages} yield (u,p) val userPagesDF = sparkContext.parallelize(userPages).toDF("user","page") // a user defined function that takes the last digit from the page and uses it to calculate a "rank". It only ranks pages with a number higher than 7 val rankUDF = udf((p:String) => if (p.takeRight(1).toInt>7) "top" else null) // New DF with the extra column "rank", which contains values for only some rows val ranked = userPagesDF.withColumn("rank", topPage($"page")) ranked.show +-----+-------+----+ | user| page|rank| +-----+-------+----+ |Alice| page_1|null| |Alice| page_2|null| |Alice| page_3|null| |Alice| page_4|null| |Alice| page_5|null| |Alice| page_6|null| |Alice| page_7|null| |Alice| page_8| top| |Alice| page_9| top| | Bob| page_1|null| | Bob| page_2|null| | Bob| page_3|null| | Bob| page_4|null| | Bob| page_5|null| | Bob| page_6|null| | Bob| page_7|null| | Bob| page_8| top| | Bob| page_9| top| +-----+-------+----+ ranked.printSchema root |-- user: string (nullable = true) |-- page: string (nullable = true) |-- rank: string (nullable = true) 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.