0

My data frame 1:

OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction|!| 4295876332|^|41|^|40|^|1|^|I|!| 4295876332|^|41|^|110|^|2|^|I|!| 4295876332|^|41|^|111|^|2|^|I|!| 4295876332|^|138|^|139|^|1|^|I|!| 4295876332|^|138|^|193|^|2|^|I|!| 4295877204|^|38|^|37|^|1|^|I|!| 4295877204|^|38|^|103|^|2|^|I|!| 4295877204|^|38|^|104|^|2|^|I|!| 4295877204|^|131|^|132|^|1|^|I|!| 4295877204|^|131|^|178|^|2|^|I|!| 4295877234|^|7|^|100|^|1|^|I|!| 4295877234|^|7|^|137|^|2|^|I|!| 4295877234|^|7|^|138|^|2|^|I|!| 4295877234|^|158|^|188|^|1|^|I|!| 4295877234|^|158|^|210|^|2|^|I|!| 4295877320|^|41|^|40|^|1|^|I|!| 4295877320|^|41|^|107|^|2|^|I|!| 4295877320|^|41|^|108|^|2|^|I|!| 4295877320|^|135|^|136|^|1|^|I|!| 4295877320|^|135|^|190|^|2|^|I|!| 4295877413|^|41|^|40|^|1|^|I|!| 4295877413|^|41|^|108|^|2|^|I|!| 4295877413|^|41|^|109|^|2|^|I|!| 4295877413|^|138|^|139|^|1|^|I|!| 4295877413|^|138|^|190|^|2|^|I|!| 4295877734|^|41|^|40|^|1|^|I|!| 4295877734|^|41|^|121|^|2|^|I|!| 4295877734|^|41|^|122|^|2|^|I|!| 4295877734|^|136|^|137|^|1|^|I|!| 4295877734|^|136|^|188|^|2|^|I|!| 4295878126|^|41|^|40|^|1|^|I|!| 4295878126|^|41|^|106|^|2|^|I|!| 4295878126|^|41|^|107|^|2|^|I|!| 4295878126|^|134|^|135|^|1|^|I|!| 4295878126|^|134|^|181|^|2|^|I|!| 4295880491|^|6|^|172|^|2|^|I|!| 4295880491|^|6|^|173|^|2|^|I|!| 4295880491|^|171|^|174|^|2|^|I|!| 4295876139|^|41|^|40|^|1|^|I|!| 4295876139|^|41|^|122|^|2|^|I|!| 4295876139|^|41|^|123|^|2|^|I|!| 4295876139|^|134|^|135|^|1|^|I|!| 4295876139|^|134|^|188|^|2|^|I|!| 4295876509|^|41|^|40|^|1|^|I|!| 4295876509|^|41|^|118|^|2|^|I|!| 4295876509|^|41|^|119|^|2|^|I|!| 4295876509|^|134|^|135|^|1|^|I|!| 4295876509|^|134|^|185|^|2|^|I|!| 4295876547|^|3|^|100|^|1|^|I|!| 4295876547|^|3|^|130|^|2|^|I|!| 4295876547|^|3|^|131|^|2|^|I|!| 4295876547|^|153|^|185|^|1|^|I|!| 4295876547|^|153|^|202|^|2|^|I|!| 4295876646|^|5|^|104|^|1|^|I|!| 4295876646|^|5|^|150|^|2|^|I|!| 4295876646|^|5|^|151|^|2|^|I|!| 4295876646|^|162|^|195|^|1|^|I|!| 4295876646|^|162|^|217|^|2|^|I|!| 4295876738|^|41|^|40|^|1|^|I|!| 4295876738|^|41|^|106|^|2|^|I|!| 4295876738|^|41|^|107|^|2|^|I|!| 4295876738|^|134|^|135|^|1|^|I|!| 4295876738|^|134|^|187|^|2|^|I|!| 4295877225|^|41|^|40|^|1|^|I|!| 4295877225|^|41|^|122|^|2|^|I|!| 4295877225|^|41|^|123|^|2|^|I|!| 4295877225|^|134|^|135|^|1|^|I|!| 4295877225|^|134|^|188|^|2|^|I|!| 4295877766|^|41|^|40|^|1|^|I|!| 4295877766|^|41|^|106|^|2|^|I|!| 4295877766|^|41|^|107|^|2|^|I|!| 4295877766|^|134|^|135|^|1|^|I|!| 4295877766|^|134|^|186|^|2|^|I|!| 4295877812|^|41|^|40|^|1|^|I|!| 4295877812|^|41|^|112|^|2|^|I|!| 4295877812|^|41|^|113|^|2|^|I|!| 4295877812|^|134|^|135|^|1|^|I|!| 4295877812|^|134|^|186|^|2|^|I|!| 4295877871|^|41|^|40|^|1|^|I|!| 4295877871|^|41|^|124|^|2|^|I|!| 4295877871|^|41|^|125|^|2|^|I|!| 4295877871|^|137|^|138|^|1|^|I|!| 4295877871|^|137|^|190|^|2|^|I|!| 4295877923|^|41|^|40|^|1|^|I|!| 4295877923|^|41|^|122|^|2|^|I|!| 4295877923|^|41|^|123|^|2|^|I|!| 4295877923|^|134|^|135|^|1|^|I|!| 4295877923|^|134|^|188|^|2|^|I|!| 4295877985|^|41|^|40|^|1|^|I|!| 4295877985|^|41|^|113|^|2|^|I|!| 4295877985|^|41|^|114|^|2|^|I|!| 4295877985|^|134|^|135|^|1|^|I|!| 4295877985|^|134|^|188|^|2|^|I|!| 4295878608|^|41|^|40|^|1|^|I|!| 4295878608|^|41|^|105|^|2|^|I|!| 4295878608|^|41|^|106|^|2|^|I|!| 4295878608|^|130|^|131|^|1|^|I|!| 4295878608|^|130|^|182|^|2|^|I|!| 4295878863|^|41|^|40|^|1|^|I|!| 4295878863|^|41|^|121|^|2|^|I|!| 4295878863|^|41|^|122|^|2|^|I|!| 4295878863|^|134|^|135|^|1|^|I|!| 4295878863|^|134|^|187|^|2|^|I|!| 4295880574|^|166|^|167|^|2|^|I|!| 4295880574|^|166|^|168|^|2|^|I|!| 4295880574|^|273|^|274|^|2|^|I|!| 4295876308|^|41|^|40|^|1|^|I|!| 4295876308|^|41|^|103|^|2|^|I|!| 4295876308|^|41|^|104|^|2|^|I|!| 4295876308|^|130|^|131|^|1|^|I|!| 4295876308|^|130|^|177|^|2|^|I|!| 

My data frame 2:

DataPartition|^|PartitionYear|^|TimeStamp|^|OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction|!| SelfSourcedPublic|^|2016|^|1515129638858|^|4295902451|^|109|^|110|^|1|^|O|!| SelfSourcedPublic|^|2016|^|1515129638859|^|4295902451|^|111|^|112|^|1|^|O|!| SelfSourcedPublic|^|2016|^|1515129638860|^|4295902451|^|109|^|113|^|2|^|O|!| SelfSourcedPublic|^|2016|^|1515129638861|^|4295902451|^|109|^|114|^|2|^|O|!| SelfSourcedPublic|^|2016|^|1515129638862|^|4295902451|^|111|^|115|^|2|^|O|!| SelfSourcedPublic|^|2016|^|1515129638863|^|4295902451|^|109|^|119|^|4|^|O|!| SelfSourcedPublic|^|2016|^|1515129638864|^|4295902451|^|109|^|120|^|4|^|O|!| SelfSourcedPublic|^|2016|^|1515129638865|^|4295902451|^|111|^|121|^|4|^|O|!| SelfSourcedPublic|^|2017|^|1515129638866|^|4295902451|^|122|^|126|^|2|^|O|!| SelfSourcedPublic|^|2017|^|1515129638867|^|4295902451|^|122|^|127|^|2|^|O|!| SelfSourcedPublic|^|2017|^|1515129639565|^|4295859031|^|126|^|127|^|1|^|I|!| SelfSourcedPublic|^|2017|^|1515129639566|^|4295859031|^|128|^|129|^|1|^|I|!| SelfSourcedPublic|^|2017|^|1515129639688|^|4295859031|^|null|^|126|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129639689|^|4295859031|^|null|^|127|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129639690|^|4295859031|^|null|^|128|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129639691|^|4295859031|^|null|^|129|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129639713|^|4295906830|^|null|^|420|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129639714|^|4295906830|^|null|^|421|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129639715|^|4295906830|^|null|^|422|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129639741|^|4295906830|^|null|^|420|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129639742|^|4295906830|^|null|^|421|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129639743|^|4295906830|^|null|^|422|^|null|^|D|!| SelfSourcedPrivate|^|2014|^|1515129639770|^|4298009288|^|171|^|206|^|2|^|O|!| SelfSourcedPrivate|^|2014|^|1515129639771|^|4298009288|^|143|^|203|^|2|^|O|!| SelfSourcedPrivate|^|2005|^|1515129639809|^|4298009288|^|167|^|168|^|4|^|O|!| SelfSourcedPrivate|^|2006|^|1515129639810|^|4298009288|^|163|^|195|^|2|^|O|!| SelfSourcedPrivate|^|2005|^|1515129639811|^|4298009288|^|163|^|196|^|1|^|O|!| SelfSourcedPrivate|^|2005|^|1515129639812|^|4298009288|^|167|^|197|^|3|^|O|!| SelfSourcedPrivate|^|2005|^|1515129639813|^|4298009288|^|167|^|198|^|2|^|O|!| SelfSourcedPrivate|^|2005|^|1515129639814|^|4298009288|^|30|^|29|^|4|^|O|!| SelfSourcedPrivate|^|2006|^|1515129639815|^|4298009288|^|22|^|73|^|2|^|O|!| SelfSourcedPrivate|^|2005|^|1515129639816|^|4298009288|^|22|^|75|^|1|^|O|!| SelfSourcedPrivate|^|2005|^|1515129639817|^|4298009288|^|30|^|76|^|3|^|O|!| SelfSourcedPrivate|^|2005|^|1515129639818|^|4298009288|^|30|^|78|^|2|^|O|!| SelfSourcedPrivate|^|2006|^|1515129640008|^|4298009288|^|163|^|164|^|4|^|O|!| SelfSourcedPrivate|^|2007|^|1515129640009|^|4298009288|^|161|^|191|^|3|^|O|!| SelfSourcedPrivate|^|2007|^|1515129640010|^|4298009288|^|161|^|192|^|2|^|O|!| SelfSourcedPrivate|^|2006|^|1515129640011|^|4298009288|^|161|^|193|^|1|^|O|!| SelfSourcedPrivate|^|2006|^|1515129640012|^|4298009288|^|163|^|194|^|3|^|O|!| SelfSourcedPrivate|^|2006|^|1515129640013|^|4298009288|^|22|^|24|^|4|^|O|!| SelfSourcedPrivate|^|2007|^|1515129640014|^|4298009288|^|19|^|66|^|3|^|O|!| SelfSourcedPrivate|^|2007|^|1515129640015|^|4298009288|^|19|^|68|^|2|^|O|!| SelfSourcedPrivate|^|2006|^|1515129640016|^|4298009288|^|19|^|70|^|1|^|O|!| SelfSourcedPrivate|^|2006|^|1515129640017|^|4298009288|^|22|^|71|^|3|^|O|!| SelfSourcedPrivate|^|2010|^|1515129640132|^|4298009288|^|155|^|183|^|2|^|O|!| SelfSourcedPrivate|^|2010|^|1515129640133|^|4298009288|^|10|^|53|^|2|^|O|!| SelfSourcedPublic|^|2017|^|1515129640204|^|4295904170|^|null|^|379|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129640205|^|4295904170|^|null|^|380|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129640206|^|4295904170|^|null|^|384|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129640313|^|4295904170|^|null|^|379|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129640314|^|4295904170|^|null|^|380|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129640315|^|4295904170|^|null|^|384|^|null|^|D|!| SelfSourcedPublic|^|2017|^|1515129640528|^|4295904170|^|381|^|379|^|3|^|O|!| SelfSourcedPublic|^|2017|^|1515129640529|^|4295904170|^|381|^|380|^|3|^|O|!| SelfSourcedPublic|^|2017|^|1515129640530|^|4295904170|^|381|^|383|^|4|^|I|!| SelfSourcedPublic|^|2017|^|1515129640531|^|4295904170|^|385|^|384|^|4|^|I|!| SelfSourcedPublic|^|2017|^|1515129641126|^|4295904170|^|372|^|379|^|3|^|O|!| SelfSourcedPublic|^|2017|^|1515129641127|^|4295904170|^|372|^|380|^|3|^|O|!| SelfSourcedPublic|^|2002|^|1515129641505|^|4295858941|^|24|^|25|^|4|^|O|!| SelfSourcedPublic|^|2002|^|1515129641506|^|4295858941|^|24|^|25|^|5|^|O|!| SelfSourcedPublic|^|2003|^|1515129641507|^|4295858941|^|30|^|31|^|2|^|O|!| SelfSourcedPublic|^|2003|^|1515129641508|^|4295858941|^|30|^|31|^|3|^|O|!| SelfSourcedPublic|^|2003|^|1515129641509|^|4295858941|^|30|^|32|^|1|^|O|!| SelfSourcedPublic|^|2003|^|1515129641510|^|4295858941|^|30|^|32|^|2|^|O|!| SelfSourcedPublic|^|2002|^|1515129641511|^|4295858941|^|24|^|33|^|3|^|O|!| SelfSourcedPublic|^|2002|^|1515129641512|^|4295858941|^|24|^|33|^|4|^|O|!| SelfSourcedPublic|^|2002|^|1515129641513|^|4295858941|^|24|^|34|^|2|^|O|!| SelfSourcedPublic|^|2002|^|1515129641514|^|4295858941|^|24|^|34|^|20|^|O|!| SelfSourcedPublic|^|2002|^|1515129641515|^|4295858941|^|1|^|2|^|4|^|O|!| SelfSourcedPublic|^|2002|^|1515129641516|^|4295858941|^|1|^|3|^|4|^|O|!| SelfSourcedPublic|^|2001|^|1515129641517|^|4295858941|^|5|^|6|^|4|^|O|!| SelfSourcedPublic|^|2001|^|1515129641518|^|4295858941|^|5|^|7|^|4|^|O|!| SelfSourcedPublic|^|2003|^|1515129641519|^|4295858941|^|12|^|10|^|2|^|O|!| SelfSourcedPublic|^|2003|^|1515129641520|^|4295858941|^|12|^|11|^|2|^|O|!| SelfSourcedPublic|^|2002|^|1515129641521|^|4295858941|^|1|^|13|^|1|^|O|!| SelfSourcedPublic|^|2003|^|1515129641522|^|4295858941|^|12|^|14|^|1|^|O|!| SelfSourcedPublic|^|2001|^|1515129641523|^|4295858941|^|5|^|15|^|3|^|O|!| SelfSourcedPublic|^|2001|^|1515129641524|^|4295858941|^|5|^|16|^|3|^|O|!| SelfSourcedPublic|^|2002|^|1515129641525|^|4295858941|^|1|^|17|^|3|^|O|!| SelfSourcedPublic|^|2002|^|1515129641526|^|4295858941|^|1|^|18|^|3|^|O|!| SelfSourcedPublic|^|2001|^|1515129641527|^|4295858941|^|5|^|19|^|1|^|O|!| SelfSourcedPublic|^|2001|^|1515129641528|^|4295858941|^|5|^|20|^|2|^|O|!| SelfSourcedPublic|^|2001|^|1515129641529|^|4295858941|^|5|^|21|^|2|^|O|!| SelfSourcedPublic|^|2002|^|1515129641530|^|4295858941|^|1|^|22|^|2|^|O|!| SelfSourcedPublic|^|2002|^|1515129641531|^|4295858941|^|1|^|23|^|2|^|O|!| SelfSourcedPublic|^|2016|^|1515129641532|^|4295858941|^|35|^|36|^|1|^|I|!| SelfSourcedPublic|^|2016|^|1515129641603|^|4295858941|^|null|^|35|^|null|^|D|!| SelfSourcedPublic|^|2016|^|1515129641604|^|4295858941|^|null|^|36|^|null|^|D|!| SelfSourcedPublic|^|2016|^|1515129641605|^|4295858941|^|null|^|37|^|null|^|D|!| SelfSourcedPrivate|^|2016|^|1515129641752|^|4298009288|^|232|^|242|^|4|^|O|!| SelfSourcedPrivate|^|2016|^|1515129641753|^|4298009288|^|248|^|249|^|1|^|O|!| SelfSourcedPrivate|^|2016|^|1515129641754|^|4298009288|^|248|^|249|^|1|^|O|!| SelfSourcedPrivate|^|2016|^|1515129641755|^|4298009288|^|230|^|240|^|4|^|O|!| SelfSourcedPrivate|^|2016|^|1515129641756|^|4298009288|^|243|^|247|^|1|^|O|!| SelfSourcedPrivate|^|2017|^|1515129641757|^|4298009288|^|248|^|252|^|2|^|O|!| SelfSourcedPrivate|^|2017|^|1515129641758|^|4298009288|^|248|^|255|^|3|^|O|!| ThirdPartyPrivate|^|2016|^|1515129641866|^|4296803503|^|1|^|2|^|1|^|I|!| SelfSourcedPublic|^|2016|^|1515129642192|^|4295907168|^|367|^|377|^|4|^|O|!| SelfSourcedPublic|^|2016|^|1515129642193|^|4295907168|^|365|^|375|^|4|^|O|!| SelfSourcedPublic|^|2016|^|1515129642194|^|4295907168|^|365|^|376|^|4|^|O|!| Japan|^|2016|^|1515129642733|^|4295876606|^|272|^|278|^|3|^|O|!| Japan|^|2016|^|1515129642734|^|4295876606|^|272|^|278|^|3|^|O|!| Japan|^|2016|^|1515129642735|^|4295876606|^|270|^|276|^|2|^|O|!| Japan|^|2016|^|1515129642736|^|4295876606|^|270|^|277|^|3|^|O|!| Japan|^|2016|^|1515129642737|^|4295876606|^|270|^|279|^|3|^|O|!| SelfSourcedPublic|^|2016|^|1515129657602|^|4296803503|^|1|^|2|^|1|^|O|!| 

My full working code:

 val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ import org.apache.spark.{ SparkConf, SparkContext } import java.sql.{Date, Timestamp} import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.udf import org.apache.spark.sql.functions.input_file_name import org.apache.spark.sql.functions.regexp_extract val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3)) val get_cus_YearPartition = spark.udf.register("get_cus_YearPartition", (filePath: String) => filePath.split("\\.")(4)) val rdd = sc.textFile("s3://trfsmallfffile/Interim2Annual/MAIN") val header = rdd.filter(_.contains("OrganizationId")).map(line => line.split("\\|\\^\\|")).first() val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq) val data = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema) val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq) val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader) val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name)) val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_YearPartition(input_file_name)) //Loading Incremental val rdd1 = sc.textFile("s3://trfsmallfffile/Interim2Annual/INCR") val header1 = rdd1.filter(_.contains("OrganizationId")).map(line => line.split("\\|\\^\\|")).first() val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq) val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1) //------------------------------- filtering only the latest from incremental ------------------------------ import org.apache.spark.sql.expressions._ val windowSpec = Window.partitionBy("OrganizationId","AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) val latestForEachKey1 = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank") val windowSpec2 = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) val latestForEachKey = latestForEachKey1.withColumn("tobefiltered", first("FFAction|!|").over(windowSpec2)) .filter($"tobefiltered" === "I|!|" || $"tobefiltered" === "O|!|" || ($"tobefiltered" === "D|!|" && $"FFAction|!|" === "D|!|")) .drop("tobefiltered", "TimeStamp") //-----------------separating the incremental df for insert, deletion and overwrite---------------- //---------------insert rows are selected ------------------------------- //insert a row if I is detected and if O is found then first delete and then insert val insertdf = latestForEachKey.filter($"FFAction|!|" === "I|!|" || $"FFAction|!|" === "O|!|").select(df1resultFinalWithYear.schema.fieldNames.map(col):_*) //------------------deleted rows with primary key "OrganizationId", "InterimPeriodId"------------------ // delete rows from parent if both D or O is found in incremental val deletedf = latestForEachKey.filter($"FFAction|!|" === "D|!|" || $"FFAction|!|" === "O|!|").select($"OrganizationId", $"InterimPeriodId", lit("delete").as("Delete")) //join by two primary keys for deletion and delete from the parent dataframe val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete") val dfToSave=dfMainOutput.union(insertdf).withColumn("FFAction|!|", when($"FFAction|!|" === "O|!|" || $"FFAction|!|" === "I|!|", lit("I|!|"))) val dfMainOutputFinal = dfToSave.na.fill("").select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated")) val headerColumn = dataHeader.columns.toSeq val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3) val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header) dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","PartitionYear") .format("csv") .option("nullValue", "") .option("delimiter", "\t") .option("quote", "\u0000") .option("header", "true") .option("codec", "gzip") .save("s3://trfsmallfffile/Interim2Annual/output") val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","PartitionYear").count FFRowCount.coalesce(1).write.format("com.databricks.spark.xml") .option("rootTag", "FFFileType") .option("rowTag", "FFPhysicalFile") .save("s3://trfsmallfffile/Interim2Annual/Descr") 

My output is two columns, order interchanged:

(`AnnualPeriodId|^|InterimPeriodId`) 

My output:

OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction|!| 4295877812|^|40|^|41|^|1|^|I|!| 4295877234|^|188|^|158|^|1|^|I|!| 4295877320|^|136|^|135|^|1|^|I|!| 4295877225|^|135|^|134|^|1|^|I|!| 4295877766|^|40|^|41|^|1|^|I|!| 4295876332|^|110|^|41|^|2|^|I|!| 4295877812|^|113|^|41|^|2|^|I|!| 4295877320|^|190|^|135|^|2|^|I|!| 4295876308|^|40|^|41|^|1|^|I|!| 4295876646|^|195|^|162|^|1|^|I|!| 4295878608|^|106|^|41|^|2|^|I|!| 4295876738|^|107|^|41|^|2|^|I|!| 4295877812|^|186|^|134|^|2|^|I|!| 4295877734|^|121|^|41|^|2|^|I|!| 4295877413|^|108|^|41|^|2|^|I|!| 4295877766|^|107|^|41|^|2|^|I|!| 4295878608|^|131|^|130|^|1|^|I|!| 4295877985|^|40|^|41|^|1|^|I|!| 4295877923|^|122|^|41|^|2|^|I|!| 4295876308|^|177|^|130|^|2|^|I|!| 4295877413|^|109|^|41|^|2|^|I|!| 4295877225|^|40|^|41|^|1|^|I|!| 4295877413|^|139|^|138|^|1|^|I|!| 4295877766|^|106|^|41|^|2|^|I|!| 4295876308|^|104|^|41|^|2|^|I|!| 4295877204|^|132|^|131|^|1|^|I|!| 4295880574|^|167|^|166|^|2|^|I|!| 4295878126|^|106|^|41|^|2|^|I|!| 4295876509|^|119|^|41|^|2|^|I|!| 4295877734|^|188|^|136|^|2|^|I|!| 4295877923|^|188|^|134|^|2|^|I|!| 4295876139|^|135|^|134|^|1|^|I|!| 4295877413|^|190|^|138|^|2|^|I|!| 4295877225|^|122|^|41|^|2|^|I|!| 4295877812|^|135|^|134|^|1|^|I|!| 4295876646|^|151|^|5|^|2|^|I|!| 4295876139|^|188|^|134|^|2|^|I|!| 4295877225|^|188|^|134|^|2|^|I|!| 4295877234|^|210|^|158|^|2|^|I|!| 4295877923|^|123|^|41|^|2|^|I|!| 4295878863|^|135|^|134|^|1|^|I|!| 4295878863|^|121|^|41|^|2|^|I|!| 4295877234|^|100|^|7|^|1|^|I|!| 4295877812|^|112|^|41|^|2|^|I|!| 4295876332|^|193|^|138|^|2|^|I|!| 4295877225|^|123|^|41|^|2|^|I|!| 4295877320|^|107|^|41|^|2|^|I|!| 4295877734|^|137|^|136|^|1|^|I|!| 4295880574|^|274|^|273|^|2|^|I|!| 4295878608|^|105|^|41|^|2|^|I|!| 4295877320|^|40|^|41|^|1|^|I|!| 4295878608|^|40|^|41|^|1|^|I|!| 4295880491|^|173|^|6|^|2|^|I|!| 4295877985|^|114|^|41|^|2|^|I|!| 4295876646|^|217|^|162|^|2|^|I|!| 4295876738|^|187|^|134|^|2|^|I|!| 4295876509|^|40|^|41|^|1|^|I|!| 4295876139|^|123|^|41|^|2|^|I|!| 4295876509|^|118|^|41|^|2|^|I|!| 4295876646|^|104|^|5|^|1|^|I|!| 4295877234|^|137|^|7|^|2|^|I|!| 4295876547|^|185|^|153|^|1|^|I|!| 4295877734|^|122|^|41|^|2|^|I|!| 4295877766|^|186|^|134|^|2|^|I|!| 4295880574|^|168|^|166|^|2|^|I|!| 4295878126|^|107|^|41|^|2|^|I|!| 4295877234|^|138|^|7|^|2|^|I|!| 4295876738|^|135|^|134|^|1|^|I|!| 4295877766|^|135|^|134|^|1|^|I|!| 4295876646|^|150|^|5|^|2|^|I|!| 4295878126|^|135|^|134|^|1|^|I|!| 4295876139|^|122|^|41|^|2|^|I|!| 4295877204|^|103|^|38|^|2|^|I|!| 4295876332|^|111|^|41|^|2|^|I|!| 4295876332|^|139|^|138|^|1|^|I|!| 4295876308|^|103|^|41|^|2|^|I|!| 4295877734|^|40|^|41|^|1|^|I|!| 4295877871|^|190|^|137|^|2|^|I|!| 4295877923|^|135|^|134|^|1|^|I|!| 4295876547|^|130|^|3|^|2|^|I|!| 4295878863|^|122|^|41|^|2|^|I|!| 4295877204|^|104|^|38|^|2|^|I|!| 4295877985|^|135|^|134|^|1|^|I|!| 4295877871|^|138|^|137|^|1|^|I|!| 4295876332|^|40|^|41|^|1|^|I|!| 4295877871|^|124|^|41|^|2|^|I|!| 4295876139|^|40|^|41|^|1|^|I|!| 4295877204|^|178|^|131|^|2|^|I|!| 4295877413|^|40|^|41|^|1|^|I|!| 4295876509|^|185|^|134|^|2|^|I|!| 4295876308|^|131|^|130|^|1|^|I|!| 4295877871|^|125|^|41|^|2|^|I|!| 4295876738|^|106|^|41|^|2|^|I|!| 4295877923|^|40|^|41|^|1|^|I|!| 4295877985|^|188|^|134|^|2|^|I|!| 4295878126|^|40|^|41|^|1|^|I|!| 4295878863|^|40|^|41|^|1|^|I|!| 4295877204|^|37|^|38|^|1|^|I|!| 4295878608|^|182|^|130|^|2|^|I|!| 4295877320|^|108|^|41|^|2|^|I|!| 4295876547|^|100|^|3|^|1|^|I|!| 4295876547|^|131|^|3|^|2|^|I|!| 4295876547|^|202|^|153|^|2|^|I|!| 4295877871|^|40|^|41|^|1|^|I|!| 4295878863|^|187|^|134|^|2|^|I|!| 4295880491|^|172|^|6|^|2|^|I|!| 4295876738|^|40|^|41|^|1|^|I|!| 4295877985|^|113|^|41|^|2|^|I|!| 4295876509|^|135|^|134|^|1|^|I|!| 4295880491|^|174|^|171|^|2|^|I|!| 4295878126|^|181|^|134|^|2|^|I|!| 

For example in DATA FRAME 1 below record is in this order

4295876139|^|134|^|135|^|1|^|I|!| 

But in the output i get in this order

4295876139|^|135|^|134|^|1|^|I|!| 

This is not if data has I flag .

This is because the this line in my code

val windowSpec2 = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) 

and

val deletedf = latestForEachKey.filter($"FFAction|!|" === "D|!|" || $"FFAction|!|" === "O|!|").select($"OrganizationId", $"InterimPeriodId", lit("delete").as("Delete")) 

and

val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete") 

In the case of Insert or I my column order is:

"OrganizationId","AnnualPeriodId","InterimPeriodId" 

In the case of O or D my column order is:

"OrganizationId","InterimPeriodId" 

Here is the output that i get where DATA FRAME 1 columns are exchanged .

I hope i am able to explain it clearly .

5
  • can you please explain how the columns are interchanging with example in input and output as currently you don't have D or O in your input data? Commented Jan 5, 2018 at 2:48
  • @RameshMaharjan Its not with the INCR data .whaever data is present in data frame 1 that comes out as interchanged ..Even without any D or O. Commented Jan 5, 2018 at 4:16
  • @RameshMaharjan added all records and output also please have a look Commented Jan 5, 2018 at 5:34
  • So for Insert i.r I data is coming as correct order but in case of O and D columns are interchanged Commented Jan 5, 2018 at 8:15
  • please see my answer below :) Commented Jan 5, 2018 at 10:40

1 Answer 1

3

The interchange happened when you joined the main df1resultFinalWithYear with deletedf. You joined with Seq("OrganizationId", "InterimPeriodId") and thus InterimPeriodId came before AnnualPeriodId. But in the insertdf and headerColumn the order is still opposite. So the interchange happened in the following line

val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete") 

You can correct that by ordering the columns as

val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete").select(df1resultFinalWithYear.schema.fieldNames.map(col):_*) 

And your problem should be solved.

Sign up to request clarification or add additional context in comments.

1 Comment

Any suggestion about this question stackoverflow.com/questions/46703623/…