Create positive and negative dataframes by filtering pos/neg records and grouping by "userid":
df_pos = df \ .filter(F.col("action") > 0) \ .groupBy("userid") \ .agg(F.collect_set("sku").alias("sku_pos_list")) \ .withColumnRenamed("userid", "userid_pos") [Out]: +----------+------------+ |userid_pos|sku_pos_list| +----------+------------+ | 123| [2345]| | 231| [4322]| +----------+------------+ df_neg = df \ .filter(F.col("action") <= 0) \ .groupBy("userid") \ .agg(F.collect_set("sku").alias("sku_neg_list")) \ .withColumnRenamed("userid", "userid_neg") [Out]: +----------+------------------+ |userid_neg| sku_neg_list| +----------+------------------+ | 123|[2345, 5422, 7622]| | 231|[8342, 4322, 5342]| +----------+------------------+
Join back the positive and negative dataframes and explode the pos/neg records:
df_joined = df_pos.join(df_neg, (F.col("userid_pos")==F.col("userid_neg")), how="full") # Clean up null, empty df_joined = df_joined \ .withColumn("userid", F.when(F.col("userid_pos").isNotNull(), F.col("userid_pos")).otherwise(F.col("userid_neg"))).drop("userid_pos", "userid_neg") \ .withColumn("sku_pos_list", F.when(F.col("sku_pos_list").isNull(), F.array([F.lit(-1)])).otherwise(F.col("sku_pos_list"))) \ .withColumn("sku_neg_list", F.when(F.col("sku_neg_list").isNull(), F.array([F.lit(-1)])).otherwise(F.col("sku_neg_list"))) [Out]: +------------+------------------+------+ |sku_pos_list|sku_neg_list |userid| +------------+------------------+------+ |[2345] |[2345, 5422, 7622]|123 | |[4322] |[8342, 4322, 5342]|231 | +------------+------------------+------+ df_joined = df_joined \ .withColumn("sku_pos", F.explode("sku_pos_list")) \ .withColumn("sku_neg", F.explode("sku_neg_list")) \ .drop("sku_pos_list", "sku_neg_list") \ .filter(F.col("sku_pos") != F.col("sku_neg")) [Out]: +------+-------+-------+ |userid|sku_pos|sku_neg| +------+-------+-------+ | 123| 2345| 5422| | 123| 2345| 7622| | 231| 4322| 8342| | 231| 4322| 5342| +------+-------+-------+
Dataset used:
df = spark.createDataFrame([ (123,2345,2), (123,2345,0), (123,5422,0), (123,7622,0), (231,4322,2), (231,4322,0), (231,8342,0), (231,5342,0), ], ["userid", "sku", "action"])
userid?