4

I have a piece of code which works well but uses pandas data frame groupby processing. However because the file is large ( > 70 million groups I need to convert the code to use PYSPARK data frame. Here is the original code using pandas dataframe with small example data:

import pandas as pd import numpy as np from scipy.optimize import minimize df = pd.DataFrame({ 'y0': np.random.randn(20), 'y1': np.random.randn(20), 'x0': np.random.randn(20), 'x1': np.random.randn(20), 'grpVar': ['a', 'b'] * 10}) # Starting values startVal = np.ones(2)*(1/2) #Constraint Sum of coefficients = 0 cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)}) # Bounds on coefficients bnds = tuple([0,1] for x in startVal) # Define a function to calculate sum of squared differences def SumSqDif(a, df): return np.sum((df['y0'] - a[0]*df['x0'])**2 + (df['y1'] - a[1]*df['x1']) **2) # Define a function to call minimize function def RunMinimize(data, startVal, bnds, cons): ResultByGrp = minimize(SumSqDif, startVal, method='SLSQP', bounds=bnds, constraints = cons, args=(data)) return ResultByGrp.x # Do the calculation by applyng the function by group: # Create GroupBy object grp_grpVar = df.groupby('grpVar') Results = grp_grpVar.apply(RunMinimize, startVal=startVal, bnds=bnds, cons=cons)) 

Now I am trying to use pySpark dataframe I convert pandas dataframe to pyspark dataframe for the purpose of testing code.

sdf = sqlContext.createDataFrame(df) type(sdf) # <class 'pyspark.sql.dataframe.DataFrame'> # Create GroupBy object Sgrp_grpVar = sdf.groupby('grpVar') # Redefine functions def sSumSqDif(a, sdf): return np.sum((sdf['y0'] - a[0]*sdf['x0'])**2 + (sdf['y1'] - a[1]*sdf['x1'])**2) def sRunMinimize(data=sdf, startVal=startVal, bnds=bnds, cons=cons): ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP', bounds=bnds, constraints = cons, args=(data)) return ResultByGrp.x from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.types import DoubleType from pyspark.sql.types import StringType udf = UserDefinedFunction(sRunMinimize , StringType()) Results = Sgrp_grpVar.agg(sRunMinimize()) 

However after I tried to define the user defined function udf I got the following errors - see below. Any help correcting my errors or suggesting an alternative approach is highly appreciated.

udf = UserDefinedFunction(sRunMinimize , StringType()) Traceback (most recent call last): File "", line 1, in File "/usr/hdp/current/spark2-client/python/pyspark/sql/functions.py", line 1760, in init self._judf = self._create_judf(name).......

1
  • 1
    My first observation is that you can't send an entire Spark dataframe as an argument to a udf, only columns of a dataframe. Commented Sep 16, 2017 at 7:51

1 Answer 1

4

You're trying to write a User Defined Aggregate Function which can't be done in pyspark see https://stackoverflow.com/a/40030740.

What you can write instead is a UDF on the data within each group collected as a list:

First for the set-up:

import pandas as pd import numpy as np from scipy.optimize import minimize import pyspark.sql.functions as psf from pyspark.sql.types import * df = pd.DataFrame({ 'y0': np.random.randn(20), 'y1': np.random.randn(20), 'x0': np.random.randn(20), 'x1': np.random.randn(20), 'grpVar': ['a', 'b'] * 10}) sdf = sqlContext.createDataFrame(df) # Starting values startVal = np.ones(2)*(1/2) #Constraint Sum of coefficients = 0 cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)}) # Bounds on coefficients bnds = tuple([0,1] for x in startVal) 

We'll broadcast these variables since we need to call them on every row of the aggregated dataframe, it will copy the values to every node so they don't have to go get them on the driver:

sc.broadcast(startVal) sc.broadcast(bnds) 

Let's aggregate the data using collect_list, we'll change the structure of the data around so we only have one column (you can collect each column into distinct columns but then you'd have to modify the way you pass data to the function):

Sgrp_grpVar = sdf\ .groupby('grpVar')\ .agg(psf.collect_list(psf.struct("y0", "y1", "x0", "x1")).alias("data")) Sgrp_grpVar.printSchema() root |-- grpVar: string (nullable = true) |-- data: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- y0: double (nullable = true) | | |-- y1: double (nullable = true) | | |-- x0: double (nullable = true) | | |-- x1: double (nullable = true) 

We can now create our UDF, the returned data type is too complex for pyspark, numpy arrays are not supported by pyspark so we'll need to change it a bit:

def sSumSqDif(a, data): return np.sum( (data['y0'] - a[0]*data['x0'])**2 \ + (data['y1'] - a[1]*data['x1'])**2) def sRunMinimize(data, startVal=startVal, bnds=bnds, cons=cons): data = pd.DataFrame({k:v for k,v in zip(["y0", "y1", "x0", "x1"], data)}) ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP', bounds=bnds, constraints = cons, args=(data)) return ResultByGrp.x.tolist() sRunMinimize_udf = lambda startVal, bnds, cons: psf.udf( lambda data: sRunMinimize(data, startVal, bnds, cons), ArrayType(DoubleType()) ) 

We can now apply this function to the collected data in each group:

Results = Sgrp_grpVar.select( "grpVar", sRunMinimize_udf(startVal, bnds, cons)("data").alias("res") ) Results.show(truncate=False) +------+-----------------------------------------+ |grpVar|res | +------+-----------------------------------------+ |b |[0.4073139282953772, 0.5926860717046227] | |a |[0.8275186444565927, 0.17248135554340727]| +------+-----------------------------------------+ 

But I don't think pyspark is the right tool for this.

Sign up to request clarification or add additional context in comments.

4 Comments

Marie, Thank you very much for your solution. I will try it with my real data. It is large: about 600 million records/70 million groups and 12 variables ( x0..x6 and y0..y6 in my example). In your opinion, if pyspark is not appropriate for this problem what is appropriate?
Marie, Before running your code on a large dataset I tried the code as it is with the small example data. It gave me an error at the last step: Traceback (most recent call last): File "<stdin>", line 1, in <module> NameError: name 'sdf_agg' is not defined
You need to have scikit learn installed on all the nodes or zip anaconda and use --archive. The functions you call for every row need to load these modules. Best solution is to install anaconda on every node
Thank you so much for the answer. For anyone who may encounter the NameError of sdf_egg, try change Results = sdf_agg.select( in the last block to Results = Sgrp_grpVar.select(

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.