0

I am implementing a code to dynamically add multiple columns to a Dataframe with null values in row

I found the following code snippet in scala where the map function of Dataframe object is used.

import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types.{DataTypes, NullType, StructType} import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession} import org.apache.spark.sql.functions.lit; def addColumnsViaMap(df: DataFrame, words: List[String]): DataFrame = { val encoder = RowEncoder.apply(getSchema(df, words)) df.map(mappingRows(df.schema)(words))(encoder) } private val mappingRows: StructType => List[String] => Row => Row = (schema) => (words) => (row) => { val addedCols: List[Any] = words.map(_=> null) Row.merge(row, Row.fromSeq(addedCols)) } private def getSchema(df: DataFrame, words: List[String]): StructType = { var schema: StructType = df.schema words.foreach(word => schema = schema.add(word, "string", false)) schema } 

I have implemented the following two functions in java

 private StructType getSchema(Dataset<Row> df, List<String> cols){ StructType schema = df.schema(); cols.forEach(col -> schema.add(col, "int", true)); return schema; } private addColumnsViaMap(Dataset<Row> df, List<String> cols){ Encoder<Row> encoder1 = RowEncoder.apply(dataConsolidationEngine.getSchema(df,cols)); df.map(new MapFunction<Set<String>, Row>() { private static final long serialVersionUID = 1L; @Override public Row call(Set<String> cols) throws Exception { // TODO Auto-generated method stub } }, encoder1); } 

The addColumnsViaMap method has compilation error connot resolve anonymous map function method due to the parameters mismatch.

and i dont understand the scala code of mappingRows especially the following StructType => List[String] => Row => Row = (schema) => (words) => (row) what this means ??

and how to implement the above scala code in Java ?

1

2 Answers 2

2

Well, this declaration is a bit complex (and IMO a bit unreadable too), so let's step back.

In scala, String, List... are types everyone knows of. You can make a variable of type String.

What you can also do, is assign a function to a variable (this is the functionnal orientation of scala), so functions also have types. Say for example, if you have a function that takes a List and outputs a String, it is of type List => String.

And does that look like in code ?

// A list of strings val names = List("alice", "bob") // A function that takes a list and returns a string def listToString(list: List[String]): String = list.mkString(",") // We can assign the function to a variable val myListToString: List[String] => String = listToString 

But we have a shorter notation for declaring functions, we may declare them "inline", without using a def statement. So that the above code can be equivalently written :

val names = List("alice", "bob") val myListToString: List[String] => String = (list) => list.mkString(",") 

So, generically speaking :

  • A => B is a type, of a function that takes an A and returns a B
  • (arg: A) => { new B() } is an actual function that takes an instance of A as input (the instance being bound to the variable name arg and whose body returns an instance of B

Now let's do something crazy, let's... start over. Say that F is a function that takes a List and returns a String. What would a function that takes an Int and return a F look like ?

Well it would be :

  • Int => F.
  • That is to say : Int => (List => String)
  • Which can be written Int => List => String

And how do you declare it ?

// Borrowing from above val names = List("alice", "bob") val myListToString: List[String] => String = (list) => list.mkString(",") // now we're doing it val intToListToString = (integerValue) => myListToString // now we're doing it in one go val intToListToString2 = (integerValue) => (list) => list.mkString(",") 

Here, intToListToString is a function that takes an int and returns "a function that takes a List and returns a String".

And you can nest again, and again.

Until you get : StructType => List[String] => Row => Row which is a type that means "a function that takes a StructType as input and returns (a function that takes a List[String]as input and returns (a function that takes a Rowas input and returns a row)).

And you could implement it as :

(schema) => // a function that takes schema, and returns (words) => // a function that takes a list of words and returns (row) => // a function that takes a row and returns Row.fromSeq(...) // another row 

Now what would that look like in Java ?

If you want to convert it strictly as it is, you may think about it this way : the natural equivalent of scala's A => B is java.util.Function<A, B>. On top of it, if you want to use a function to do a Spark map operation on a Dataframe, you have to use a MapFunction<>.

So we are looking to implement a Function<Schema, Function<List<String>, MapFunction<Row, Row>>> or something of the sort.

Using java lambda notation, you can do it this way :

schema -> words -> row -> Row.merge(row, Row.fromSeq(Array.newInstance(String.class, words.size))) 

Which is a function that takes a schema,

  • that returns a function that takes a list of word

    • that returns a function that takes a Row

      • that returns a row augmented with columns containing null

Maybe my java syntax is correct, maybe not I do not know.

What I do know is that it is a vastly too complex way of achieving your requirements.

What is this requirement : you have a dataframe, you have a list of words, you want to create new columns with this name and containing null.

So what I would have done in scala is this :

import org.apache.spark.sql.DataFrame def addColumnsViaMap(dataframe: DataFrame, words: List[String]) = words.foldLeft(dataframe)((df, word) => df.withColumn(word, lit(null: String))) val dataframe = Seq(("a", "b"), ("c", "d")).toDF("columnA", "columnB") val words = List("columnC", "columnD") addColumnsViaMap(dataframe, words).show +-------+-------+-------+-------+ |columnA|columnB|columnC|columnD| +-------+-------+-------+-------+ | a| b| null| null| | c| d| null| null| +-------+-------+-------+-------+ 

Which you can probably write in java as such

DataFrame addColumnsViaMap(DataFrame dataframe, List<String> words) { for (String word: words) { dataframe = dataframe.withColumn(word, lit((String) null)) } return dataframe; } 

Once again, I do not have a Java based spark environment, but my point is : if you get the principle, rewritting is simple.

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks that helps, now i understand the scala code clearly. Now that if i have to implement 3 MapFunction methods in one single statement, its lot more confusing and difficult to implement. Could you please help me to break the above scala code to 3 separate map functions. So that i can try to implement in java as 3 separate map functions.
@vkumar22 you do not have to write it as 3 separate functions. Though you probably can. First : I've rewritten it all in one line of scala that does not involve any nesting of functions. Second : having functions that return functions 3 level deep is not natural in Java (possible ? yes, natural ? not so much). Scala being functionnally oriented is more natural for this. But it could just as easily be written as a single function that takes 3 arguments in both languages, instead of a curried version (3 functions taking one argument each). If you undertand it, you can convert it.
Thanks !! Yes, i wanted to follow that approach of adding the new columns initially. But i found in the link lansalo.com/2018/05/13/… where they have explained that solution is not so performance/memory efficient. So i wanted to follow the other efficient approach.
But what are they measuring ? They are measuring the time spent declaring the dataframe. Surely the time spent creating a dataframe is ridiculously dwarfed by the computation that it involves. Who cares if declaring the computation takes 400 millli seconds instead of 70. Surely you are not running a spark cluster for a computation whose order of magnitude is in the milli seconds ball-park ? Have you actually measured ?
0
private val mappingRows: StructType => List[String] => Row => Row = (schema) => (words) => (row) => { val addedCols: List[Any] = words.map(_=> null) Row.merge(row, Row.fromSeq(addedCols)) } 

Simply put, that could be read as :

mappingRows is a 'function' that takes 3 parameters (of types StructType, List and Row, say schema, words and row) and that returns a Row. But instead of calling it like that :

mappingRows(schema, words, row)` 

you will go

mappingRows(schema)(words)(row) 

This means that calling just

mappingRows(schema)(words) 

will return a function that take a Row and returns a Row : a mapping function that you can pass to the typical .map() function.

Basically, given a schema and a list of col names, the closure takes a row as input. It simply adds on null column to that row for each given col name.

Does it help you answer your question ?

3 Comments

Thanks for your quick response. If you look at the following code ``` def addColumnsViaMap(df: DataFrame, words: List[String]): DataFrame = { val encoder = RowEncoder.apply(getSchema(df, words)) df.map(mappingRows(df.schema)(words))(encoder) } ``` The mappingRows is called with only two parameters schema and words, but row is not passed to it. how is row accessed inside that ?
@vkumar22 yes it is called with only two params : schema and words : that will return a function that takes a Row and returns a Row : a mapping function, that you can pass to the map() function ... Got it ? I edited my answer accordingly.
Thanks for providing more details, i got it. Let me see if i can try that in java

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.