0

i am using Spark and scala 2.4

My salesperson Dataframe looks like: it has total 54 salesperson, i took example of only 4 column

Schema of SalesPerson table.

root |-- col: struct (nullable = false) | |-- salesperson_4: string (nullable = true) | |-- salesperson_10: string (nullable = true) | |-- salesperson_11: string (nullable = true) | |-- salesperson_21: string (nullable = true) 

Data of Salesperson Table.

+--------------+--------------+--------------+--------------+ |salesperson_4 |salesperson_10|salesperson_11|salesperson_21| +--------------+--------------+--------------+--------------+ | Customer_933 | Customer_1760| Customer_454 | Customer_127 | |Customer_1297 |Customer_2411 |Customer_158 |Customer_2703 | |Customer_861 |Customer_1550 |Customer_812 |Customer_2976 | 

+--------------+--------------+--------------+--------------+

My salesType dataframe looks like

Schema of salesType

root |-- Type: string (nullable = true) |-- Customer: string (nullable = true) 

Data of salesType

|Type |customer | +------+-------------+ |Online|Customer_933 | |inshop|Customer_933| |inshop|Customer_1297| |Online|Customer_2411| |Online|Customer_2411| |Online|Customer_1550| |Online|Customer_2976| |Online|Customer_812 | |Online|Customer_812 | |inshop|Customer_127 | +------+-------------+ 

i am trying to check which all customer from Salesperson table are available in SalesType table. with two additional column, which shows customer belong to specific salespersonand count of customer occurance in SalesPlace table. Basically all customer from salesperson table and it existance in SalesType table

Expected Output: +------+-------------++------+-------------++------+-------------+ CustomerBelongstoSalesperson|Customer |occurance| salesperson_4 |Customer_933 |2 salesperson_10 |Customer_2411|2 salesperson_4 |Customer_1297|1 salesperson_10 |Customer_1550|1 SalesPerson_21 |Customer_2976|1 SalesPerson_11 |Customer_812 |2 SalesPerson_21 |Customer_127 |1 salesperson_4 |Customer_861 |0 salesperson_10 |Customer_1760|0 SalesPerson_11 |Customer_454 |0 SalesPerson_11 |Customer_158 |0 SalesPerson_21 |Customer_2703|0 +------+-------------++------+-------------++------+-------------+ 

Code:

val stringCol = df1.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ") val processedDF = df1.selectExpr(s"stack(${df1.columns.length}, $stringCol) as (Salesperson, Customer)") processedDF.show(false) processedDF.join(df2, Seq("Customer"), "left") .groupBy("Customer") .agg(count("Place").as("Occurance"), first("Salesperson").as("Salesperson")) .show(false) 

Thanks a lot...... Please share your suggestion enter image description here

enter image description here

8
  • .agg(count("Place") Place value is not present anywhere in the input provided. so that I can update the answer after validating it. Commented Aug 3, 2020 at 5:47
  • @smart_coder: Sorry Typo error. Corrected now Commented Aug 3, 2020 at 8:58
  • replacement of Place which name you replaced? Commented Aug 3, 2020 at 9:00
  • @smart_coder: Type Commented Aug 3, 2020 at 9:12
  • Please chek my below answer and retry and let me know if that works. Commented Aug 3, 2020 at 9:13

1 Answer 1

1

this is working in spark 2.4.0+,

val sourceDF = Seq( ("Customer_933","Customer_1760","Customer_454","Customer_127"), ("Customer_1297","Customer_2411","Customer_158","Customer_2703"), ("Customer_861","Customer_1550","Customer_812","Customer_2976") ).toDF("salesperson_4","salesperson_10","salesperson_11","salesperson_21") sourceDF.show() /* +-------------+--------------+--------------+--------------+ |salesperson_4|salesperson_10|salesperson_11|salesperson_21| +-------------+--------------+--------------+--------------+ | Customer_933| Customer_1760| Customer_454| Customer_127| |Customer_1297| Customer_2411| Customer_158| Customer_2703| | Customer_861| Customer_1550| Customer_812| Customer_2976| +-------------+--------------+--------------+--------------+ */ val salesDF= Seq( ("Online","Customer_933"), ("inshop","Customer_933"), ("inshop","Customer_1297"), ("Online","Customer_2411"), ("Online","Customer_2411"), ("Online","Customer_1550"), ("Online","Customer_2976"), ("Online","Customer_812"), ("Online","Customer_812"), ("inshop","Customer_127")).toDF("Type","Customer") salesDF.show() /* +------+-------------+ | Type| Customer| +------+-------------+ |Online| Customer_933| |inshop| Customer_933| |inshop|Customer_1297| |Online|Customer_2411| |Online|Customer_2411| |Online|Customer_1550| |Online|Customer_2976| |Online| Customer_812| |Online| Customer_812| |inshop| Customer_127| +------+-------------+ */ val stringCol = sourceDF.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ") val processedDF = sourceDF.selectExpr(s"stack(${sourceDF.columns.length}, $stringCol) as (Salesperson, Customer)") processedDF.show(false) /* +--------------+-------------+ |Salesperson |Customer | +--------------+-------------+ |salesperson_4 |Customer_933 | |salesperson_10|Customer_1760| |salesperson_11|Customer_454 | |salesperson_21|Customer_127 | |salesperson_4 |Customer_1297| |salesperson_10|Customer_2411| |salesperson_11|Customer_158 | |salesperson_21|Customer_2703| |salesperson_4 |Customer_861 | |salesperson_10|Customer_1550| |salesperson_11|Customer_812 | |salesperson_21|Customer_2976| +--------------+-------------+ */ processedDF.join(salesDF, Seq("Customer"), "left").groupBy("Customer").agg(count("Type").as("Occurance"), first("Salesperson").as("Salesperson")).show(false) /* +-------------+---------+--------------+ |Customer |Occurance|Salesperson | +-------------+---------+--------------+ |Customer_2411|2 |salesperson_10| |Customer_158 |0 |salesperson_11| |Customer_812 |2 |salesperson_11| |Customer_1760|0 |salesperson_10| |Customer_2703|0 |salesperson_21| |Customer_861 |0 |salesperson_4 | |Customer_127 |1 |salesperson_21| |Customer_2976|1 |salesperson_21| |Customer_1297|1 |salesperson_4 | |Customer_454 |0 |salesperson_11| |Customer_933 |2 |salesperson_4 | |Customer_1550|1 |salesperson_10| +-------------+---------+--------------+ */ 
Sign up to request clarification or add additional context in comments.

3 Comments

instead of dataframe, can i used registered view for sourcedf? Please suggest
@DataQuest5, Give a try and post here for help!
Sorry....... This is not working properly, please allow me edit question and add error picture. kindly help.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.