I have a piece of code which works well but uses pandas data frame groupby processing. However because the file is large ( > 70 million groups I need to convert the code to use PYSPARK data frame. Here is the original code using pandas dataframe with small example data:
import pandas as pd import numpy as np from scipy.optimize import minimize df = pd.DataFrame({ 'y0': np.random.randn(20), 'y1': np.random.randn(20), 'x0': np.random.randn(20), 'x1': np.random.randn(20), 'grpVar': ['a', 'b'] * 10}) # Starting values startVal = np.ones(2)*(1/2) #Constraint Sum of coefficients = 0 cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)}) # Bounds on coefficients bnds = tuple([0,1] for x in startVal) # Define a function to calculate sum of squared differences def SumSqDif(a, df): return np.sum((df['y0'] - a[0]*df['x0'])**2 + (df['y1'] - a[1]*df['x1']) **2) # Define a function to call minimize function def RunMinimize(data, startVal, bnds, cons): ResultByGrp = minimize(SumSqDif, startVal, method='SLSQP', bounds=bnds, constraints = cons, args=(data)) return ResultByGrp.x # Do the calculation by applyng the function by group: # Create GroupBy object grp_grpVar = df.groupby('grpVar') Results = grp_grpVar.apply(RunMinimize, startVal=startVal, bnds=bnds, cons=cons)) Now I am trying to use pySpark dataframe I convert pandas dataframe to pyspark dataframe for the purpose of testing code.
sdf = sqlContext.createDataFrame(df) type(sdf) # <class 'pyspark.sql.dataframe.DataFrame'> # Create GroupBy object Sgrp_grpVar = sdf.groupby('grpVar') # Redefine functions def sSumSqDif(a, sdf): return np.sum((sdf['y0'] - a[0]*sdf['x0'])**2 + (sdf['y1'] - a[1]*sdf['x1'])**2) def sRunMinimize(data=sdf, startVal=startVal, bnds=bnds, cons=cons): ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP', bounds=bnds, constraints = cons, args=(data)) return ResultByGrp.x from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.types import DoubleType from pyspark.sql.types import StringType udf = UserDefinedFunction(sRunMinimize , StringType()) Results = Sgrp_grpVar.agg(sRunMinimize()) However after I tried to define the user defined function udf I got the following errors - see below. Any help correcting my errors or suggesting an alternative approach is highly appreciated.
udf = UserDefinedFunction(sRunMinimize , StringType()) Traceback (most recent call last): File "", line 1, in File "/usr/hdp/current/spark2-client/python/pyspark/sql/functions.py", line 1760, in init self._judf = self._create_judf(name).......