1

In my pyspark job, I have a huge data framework with more than 6,000 columns in the following format:

id_ a1 a2 a3 a4 a5 .... a6250 u1827s True False True False False .... False ... 

Where the majority of the columns a1,a2,a3,...,a6250 are of binary type. I need to group this data by all these columns and aggregate the number of distinct ids for each combination, e.g.

df = df.groupby(list_of_cols).agg(F.countDistinct("id_")) 

where list_of_cols = [a1,a2,...,a6250]. When running this pyspark job, I am having java.lang.StackOverflowError error. I am aware that I can increase the stack size (as per https://rangareddy.github.io/SparkStackOverflow/), however, I'd prefer a more elegant solution that would also enable a more convenient output.

I have two ideas what to do before grouping by:

  1. Encode a combination of a1,a2,...,a6250 columns into a single binary column, such as a binary number with 6250 bits where the bit on position k would encode True or False value for the column a_k, e.g. in the example above the value would be 10100...0 (a1 is true, a2 is false, a3 is true, a4 is false, a5 is false,... a6250 is false).

  2. Collect these values into a binary array, e.g. have 1 column like array(True,False,True,False,False,....,False).

Which way is better - to increase a stack size and deal with 6000+ columns, to use a single binary column, or an array of binary values?

1
  • 1
    you could try to convert the bool columns to binary (int) type and concat these columns into a single column (e.g., [True, True, False, True] --> "1101"). use the new column in group by. Commented Jun 5, 2024 at 8:02

2 Answers 2

1

I believe the first solution would be more practical to find the distinct count of all existing combinations. Here's some code I've tried to reproduce this solution given your data schema

from pyspark.sql import Row import pyspark.sql.functions as F data = [ ("A", True, True, True), # Counted ("A", True, True, True), # Not Counted ("A", False, False, False), # Counted ("B", False, False, False), # Counted ("B", False, False, False), # Not Counted ("B", True, True, True), # Counted ] columns = ["id", "a0", "a1", "a2"] df = spark.createDataFrame(data, columns) # To dynamically access the columns list on each executor dynamic_column_list = spark.sparkContext.broadcast(df.columns) def extract_pattern(row): """ Extract the information from the row according to the following values and append to pattern. True => "1" False => "0" """ pattern = "" for column in dynamic_column_list.value: bit = "" if row[column] is True: bit = "1" elif row[column] is False: bit = "0" pattern += bit return Row(id=row["id"], pattern=pattern) extracted_pattern = df.rdd.map(lambda r: extract_pattern(r)).toDF() counted_by_pattern = extracted_pattern.groupBy("pattern").agg(F.countDistinct("id")) counted_by_pattern.show() 

I tried to use the same function to get the distinct count of IDs associated with each pattern

counted_by_pattern = extracted_pattern.groupBy("pattern").agg(F.countDistinct("id")) 

I haven't tried to replicate the 6K+ columns, but please share the results after you review and try this code.

Sign up to request clarification or add additional context in comments.

2 Comments

Thank you, your solution works in debug model. I need to wait for 1-2 days for my current job to finish before I test it. I will let you know.
I tested this approach. It is faster in pyspark, takes ~ half time than working with 6,000 columns directly. Also, does not require increasing stack. However, I noticed that in parquet format this representation takes ~twice as much memory. I tried to encode pattern in a bytearray, still takes twice more memory than storing 6,000 binary columns
1

using Ahmed's logic, you could build the steps in pyspark completely.

  1. cast the boolean columns as integer to create binary columns
  2. concat the binary columns to create a single column with all flags
  3. group by new column and count unique IDs

here's an example

# using Ahmed's test input # +---+-----+-----+-----+ # | id| c1| c2| c3| # +---+-----+-----+-----+ # | A| true| true| true| # | A| true| true| true| # | A|false|false|false| # | B|false|false|false| # | B|false|false|false| # | B| true| true| true| # +---+-----+-----+-----+ # list all boolean columns bin_cols = ['c1', 'c2', 'c3'] data_sdf. \ withColumn('flag_concat', func.concat_ws('-', *[func.col(k).cast('int') for k in bin_cols]) ). \ groupBy('flag_concat'). \ agg(func.countDistinct('id').alias('unique_count')). \ show() # +-----------+------------+ # |flag_concat|unique_count| # +-----------+------------+ # | 0-0-0| 2| # | 1-1-1| 2| # +-----------+------------+ 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.