You can use groupby to get counts of items in separate ratings and separate combinations of ratings and types and calculate conditional probability using these values.
from pyspark.sql import functions as F ratings_cols = ["company", "model", "rating", "type"] ratings_values = [ ("ford", "mustang", "A", "coupe"), ("chevy", "camaro", "B", "coupe"), ("ford", "fiesta", "C", "sedan"), ("ford", "focus", "A", "sedan"), ("ford", "taurus", "B", "sedan"), ("toyota", "camry", "B", "sedan"), ] ratings_df = spark.createDataFrame(data=ratings_values, schema=ratings_cols) ratings_df.show() # +-------+-------+------+-----+ # |company| model|rating| type| # +-------+-------+------+-----+ # | ford|mustang| A|coupe| # | chevy| camaro| B|coupe| # | ford| fiesta| C|sedan| # | ford| focus| A|sedan| # | ford| taurus| B|sedan| # | toyota| camry| B|sedan| # +-------+-------+------+-----+ probability_df = (ratings_df.groupby(["rating", "type"]) .agg(F.count(F.lit(1)).alias("rating_type_count")) .join(ratings_df.groupby("rating").agg(F.count(F.lit(1)).alias("rating_count")), on="rating") .withColumn("conditional_probability", F.round(F.col("rating_type_count")/F.col("rating_count"), 2)) .select(["rating", "type", "conditional_probability"]) .sort(["type", "rating"])) probability_df.show() # +------+-----+-----------------------+ # |rating| type|conditional_probability| # +------+-----+-----------------------+ # | A|coupe| 0.5| # | B|coupe| 0.33| # | A|sedan| 0.5| # | B|sedan| 0.67| # | C|sedan| 1.0| # +------+-----+-----------------------+