Skip to main content
updating the code
Source Link
user7547751
user7547751

Here is my code that i have tried

val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer") .select($"LineItem_organizationId", $"LineItem_lineItemId", when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition_1").as("DataPartition_1"), when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"), when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").alias("StatementtypeCode"), when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"), when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"), when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"), when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"), when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"), when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"), when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"), when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"), when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed".cast(DataTypes.StringType)).as("IsRangeAllowed"), when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"), when($"SegmentGroupDescription".isNotNull, $"SegmentGroupDescription").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"), when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"), when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"), when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"), when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"), when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"), when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"), when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"), when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"), when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"), when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit".cast(DataTypes.StringType)).as("IsCredit"), when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"), when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"), when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"), when($"FFAction_1".isNotNull, $"FFAction_1").otherwise((concat(col("FFAction"), lit("|!|"))).as("FFAction"))) .filter(!$"FFAction".contains("D")) val dfMainOutputFinal = dfMainOutput.select(concat_ws("|^|", columns.map(c => col(c)): _*).as("concatenated")) dfMainOutputFinal.write.partitionBy("DataPartition_1","StatementTypeCode") .format("csv") .option("codec", "gzip") .save("s3://trfsdisu/SPARK/FinancialLineItem/output") 

Here is my code that i have tried

val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer") .select($"LineItem_organizationId", $"LineItem_lineItemId", when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition_1").as("DataPartition_1"), when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"), when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").alias("StatementtypeCode"), when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"), when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"), when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"), when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"), when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"), when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"), when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"), when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"), when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed".cast(DataTypes.StringType)).as("IsRangeAllowed"), when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"), when($"SegmentGroupDescription".isNotNull, $"SegmentGroupDescription").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"), when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"), when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"), when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"), when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"), when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"), when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"), when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"), when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"), when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"), when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit".cast(DataTypes.StringType)).as("IsCredit"), when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"), when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"), when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"), when($"FFAction_1".isNotNull, $"FFAction_1").otherwise((concat(col("FFAction"), lit("|!|"))).as("FFAction"))) .filter(!$"FFAction".contains("D")) val dfMainOutputFinal = dfMainOutput.select(concat_ws("|^|", columns.map(c => col(c)): _*).as("concatenated")) dfMainOutputFinal.write.partitionBy("DataPartition_1","StatementTypeCode") .format("csv") .option("codec", "gzip") .save("s3://trfsdisu/SPARK/FinancialLineItem/output") 
Removed unnecessary code and rewrote some sentences
Source Link
Shaido
  • 28.6k
  • 26
  • 76
  • 82

How to do custom partition in spark data frame indataframe with saveAsTextFile Option

I have created data in sparkSpark and then didperformed a join operation and, finally iI have to save the output to partitioned files  .

Now i can not use below option because of my delimiter issue . So iI am converting data frame into RDD and then saving as text file that allows me to use multi char-char delimiter  .

But now my concern My question is to how to apply data frame columnuse dataframe columns as custom partition in this option ..

Here is my full code case.

val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ import org.apache.spark.{ SparkConf, SparkContext } import java.sql.{Date, Timestamp} import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.udf val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN") val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/INCR") val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*) val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*) import org.apache.spark.sql.expressions._ val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc) val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer") .select($"LineItem_organizationId", $"LineItem_lineItemId",$"DataPartiotion", when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"), when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").alias("StatementtypeCode"), when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"), when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"), when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"), when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"), when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"), when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"), when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"), when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"), when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed".cast(DataTypes.StringType)).as("IsRangeAllowed"), when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"), when($"SegmentGroupDescription".isNotNull, $"SegmentGroupDescription").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"), when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"), when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"), when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"), when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"), when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"), when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"), when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"), when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"), when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"), when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit".cast(DataTypes.StringType)).as("IsCredit"), when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"), when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"), when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"), when($"FFAction_1".isNotNull, $"FFAction_1").otherwise((concat(col("FFAction"), lit("|!|"))).as("FFAction"))) .filter(!$"FFAction".contains("D")) 

Now iI can not use below option for custom partition because it does not multichar delimitersupport multi-char delimiter:

 dfMainOutput.write.partitionBy("DataPartiotion","StatementTypeCode") .format("csv") .option("delimiter", "^") .option("nullValue", "") .option("codec", "gzip") .save("s3://trfsdisu/SPARK/FinancialLineItem/output") 

To use multi char-char delimiter iI have converted this in rddRDD like below code ..:

But in above option how would iI do custom partition based on "DataPartiotion","StatementTypeCode"the columns "DataPartiotion" and "StatementTypeCode"?

Do iI have to convert back to again from RDD to data frame a dataframe? If so schema will change .

Please suggest me some option ...

How to do custom partition in spark data frame in saveAsTextFile Option

I have created data in spark and then did join operation and finally i have to save output to partitioned files  .

Now i can not use below option because of my delimiter issue . So i am converting data frame into RDD and then saving as text file that allows me to use multi char delimiter  .

But now my concern is to how to apply data frame column as custom partition in this option ..

Here is my full code .

val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ import org.apache.spark.{ SparkConf, SparkContext } import java.sql.{Date, Timestamp} import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.udf val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN") val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/INCR") val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*) val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*) import org.apache.spark.sql.expressions._ val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc) val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer") .select($"LineItem_organizationId", $"LineItem_lineItemId",$"DataPartiotion", when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"), when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").alias("StatementtypeCode"), when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"), when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"), when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"), when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"), when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"), when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"), when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"), when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"), when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed".cast(DataTypes.StringType)).as("IsRangeAllowed"), when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"), when($"SegmentGroupDescription".isNotNull, $"SegmentGroupDescription").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"), when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"), when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"), when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"), when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"), when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"), when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"), when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"), when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"), when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"), when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit".cast(DataTypes.StringType)).as("IsCredit"), when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"), when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"), when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"), when($"FFAction_1".isNotNull, $"FFAction_1").otherwise((concat(col("FFAction"), lit("|!|"))).as("FFAction"))) .filter(!$"FFAction".contains("D")) 

Now i can not use below option for custom partition because it does not multichar delimiter

 dfMainOutput.write.partitionBy("DataPartiotion","StatementTypeCode") .format("csv") .option("delimiter", "^") .option("nullValue", "") .option("codec", "gzip") .save("s3://trfsdisu/SPARK/FinancialLineItem/output") 

To use multi char delimiter i have converted this in rdd like below code ..

But in above option how would i do custom partition based on "DataPartiotion","StatementTypeCode"?

Do i have to convert back to again from RDD to data frame ? If so schema will change .

Please suggest me some option ...

How to do custom partition in spark dataframe with saveAsTextFile

I have created data in Spark and then performed a join operation, finally I have to save the output to partitioned files.

I am converting data frame into RDD and then saving as text file that allows me to use multi-char delimiter. My question is to how use dataframe columns as custom partition in this case.

I can not use below option for custom partition because it does not support multi-char delimiter:

dfMainOutput.write.partitionBy("DataPartiotion","StatementTypeCode") .format("csv") .option("delimiter", "^") .option("nullValue", "") .option("codec", "gzip") .save("s3://trfsdisu/SPARK/FinancialLineItem/output") 

To use multi-char delimiter I have converted this in RDD like below code:

But in above option how would I do custom partition based on the columns "DataPartiotion" and "StatementTypeCode"?

Do I have to convert back to again from RDD to a dataframe?

val sqlContext = new org.apache.spark.sql.SQLContext(sc)  import sqlContext.implicits._  import org.apache.spark.{ SparkConf, SparkContext }  import java.sql.{Date, Timestamp}  import org.apache.spark.sql.Row  import org.apache.spark.sql.types._  import org.apache.spark.sql.functions.udf val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN") val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/INCR") val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*) val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*) import org.apache.spark.sql.expressions._ val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc) val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer") .select($"LineItem_organizationId", $"LineItem_lineItemId",$"DataPartiotion", when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"), when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").alias("StatementtypeCode"), when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"), when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"), when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"), when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"), when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"), when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"), when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"), when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"), when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed".cast(DataTypes.StringType)).as("IsRangeAllowed"), when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"), when($"SegmentGroupDescription".isNotNull, $"SegmentGroupDescription").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"), when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"), when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"), when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"), when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"), when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"), when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"), when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"), when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"), when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"), when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit".cast(DataTypes.StringType)).as("IsCredit"), when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"), when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"), when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"), when($"FFAction_1".isNotNull, $"FFAction_1").otherwise((concat(col("FFAction"), lit("|!|"))).as("FFAction"))) .filter(!$"FFAction".contains("D")) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc)  import sqlContext.implicits._  import org.apache.spark.{ SparkConf, SparkContext }  import java.sql.{Date, Timestamp}  import org.apache.spark.sql.Row  import org.apache.spark.sql.types._  import org.apache.spark.sql.functions.udf val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN") val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/INCR") val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*) val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*) import org.apache.spark.sql.expressions._ val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc) val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer") .select($"LineItem_organizationId", $"LineItem_lineItemId",$"DataPartiotion", when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"), when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").alias("StatementtypeCode"), when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"), when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"), when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"), when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"), when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"), when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"), when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"), when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"), when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed".cast(DataTypes.StringType)).as("IsRangeAllowed"), when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"), when($"SegmentGroupDescription".isNotNull, $"SegmentGroupDescription").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"), when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"), when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"), when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"), when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"), when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"), when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"), when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"), when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"), when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"), when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit".cast(DataTypes.StringType)).as("IsCredit"), when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"), when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"), when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"), when($"FFAction_1".isNotNull, $"FFAction_1").otherwise((concat(col("FFAction"), lit("|!|"))).as("FFAction"))) .filter(!$"FFAction".contains("D")) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ import org.apache.spark.{ SparkConf, SparkContext } import java.sql.{Date, Timestamp} import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.udf val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN") val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/INCR") val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*) val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*) import org.apache.spark.sql.expressions._ val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc) val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer") .select($"LineItem_organizationId", $"LineItem_lineItemId",$"DataPartiotion", when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"), when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").alias("StatementtypeCode"), when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"), when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"), when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"), when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"), when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"), when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"), when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"), when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"), when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed".cast(DataTypes.StringType)).as("IsRangeAllowed"), when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"), when($"SegmentGroupDescription".isNotNull, $"SegmentGroupDescription").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"), when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"), when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"), when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"), when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"), when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"), when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"), when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"), when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"), when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"), when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit".cast(DataTypes.StringType)).as("IsCredit"), when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"), when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"), when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"), when($"FFAction_1".isNotNull, $"FFAction_1").otherwise((concat(col("FFAction"), lit("|!|"))).as("FFAction"))) .filter(!$"FFAction".contains("D")) 
Source Link
user7547751
user7547751
Loading