It is relatively simple if you use pivot function. First lets create a data set like the one in your question:
import org.apache.spark.sql.functions.{concat, first, lit, when} val df = Seq( ("1", "9/20/16", "abc0", "xyz0", "o", "1"), ("1", "9/20/16", "abc1", "xyz1", "o", "2"), ("1", "9/20/16", "abc2", "xyz2", "i", "3"), ("1", "9/20/16", "abc3", "xyz3", "i", "4") ).toDF("SessionId", "date", "orig", "dest", "legind", "nbr")
then define and attach helper columns:
// This will be the column name val key = when($"legind" === "o", concat(lit("orig"), $"nbr")) .when($"legind" === "i", concat(lit("dest"), $"nbr")) // This will be the value val value = when($"legind" === "o", $"orig") // If o take origin .when($"legind" === "i", $"dest") // If i take dest val withKV = df.withColumn("key", key).withColumn("value", value)
This will result in a DataFrame like this:
+---------+-------+----+----+------+---+-----+-----+ |SessionId| date|orig|dest|legind|nbr| key|value| +---------+-------+----+----+------+---+-----+-----+ | 1|9/20/16|abc0|xyz0| o| 1|orig1| abc0| | 1|9/20/16|abc1|xyz1| o| 2|orig2| abc1| | 1|9/20/16|abc2|xyz2| i| 3|dest3| xyz2| | 1|9/20/16|abc3|xyz3| i| 4|dest4| xyz3| +---------+-------+----+----+------+---+-----+-----+
Next let's define a list of possible levels:
val levels = Seq("orig", "dest").flatMap(x => (1 to 4).map(y => s"$x$y"))
and finally pivot
val result = withKV .groupBy($"sessionId", $"date") .pivot("key", levels) .agg(first($"value", true)).show
And the result is:
+---------+-------+-----+-----+-----+-----+-----+-----+-----+-----+ |sessionId| date|orig1|orig2|orig3|orig4|dest1|dest2|dest3|dest4| +---------+-------+-----+-----+-----+-----+-----+-----+-----+-----+ | 1|9/20/16| abc0| abc1| null| null| null| null| xyz2| xyz3| +---------+-------+-----+-----+-----+-----+-----+-----+-----+-----+
1,9/20/16,abc0,abc1,null, null, xyz0,xyz1, null, null