3

Firstly I tried everything in the link below to fix my error but none of them worked.

How to convert RDD of dense vector into DataFrame in pyspark?

I am trying to convert a dense vector into a dataframe (Spark preferably) along with column names and running into issues.

My column in spark dataframe is a vector that was created using Vector Assembler and I now want to convert it back to a dataframe as I would like to create plots on some of the variables in the vector.

Approach 1:

from pyspark.ml.linalg import SparseVector, DenseVector from pyspark.ml.linalg import Vectors temp=output.select("all_features") temp.rdd.map( lambda row: (DenseVector(row[0].toArray())) ).toDF() 

Below is the Error

TypeError: not supported type: <type 'numpy.ndarray'> 

Approach 2:

from pyspark.ml.linalg import VectorUDT from pyspark.sql.functions import udf from pyspark.ml.linalg import * as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDT()) result = output.withColumn("all_features", as_ml("all_features")) result.head(5) 

Error:

AttributeError: 'numpy.ndarray' object has no attribute 'asML' 

I also tried to convert the dataframe into a Pandas dataframe and after that I am not able to split the values into separate columns

Approach 3:

pandas_df=temp.toPandas() pandas_df1=pd.DataFrame(pandas_df.all_features.values.tolist()) 

Above code runs fine but I still have only one column in my dataframe with all the values separated by commas as a list.

Any help is greatly appreciated!

EDIT:

Here is how my temp dataframe looks like. It just has one column all_features. I am trying to create a dataframe that splits all of these values into separate columns (all_features is a vector that was created using 200 columns)

+--------------------+ | all_features| +--------------------+ |[0.01193689934723...| |[0.04774759738895...| |[0.0,0.0,0.194417...| |[0.02387379869447...| |[1.89796699621085...| +--------------------+ only showing top 5 rows 

Expected output is a dataframe with all 200 columns separated out in a dataframe

+----------------------------+ | col1| col2| col3|... +----------------------------+ |0.01193689934723|0.0|0.5049431301173817... |0.04774759738895|0.0|0.1657316216149636... |0.0|0.0|7.213126372469... |0.02387379869447|0.0|0.1866693496827619|... |1.89796699621085|0.0|0.3192169213385746|... +----------------------------+ only showing top 5 rows 

Here is how my Pandas DF output looks like

 0 0 [0.011936899347238104, 0.0, 0.5049431301173817... 1 [0.047747597388952415, 0.0, 0.1657316216149636... 2 [0.0, 0.0, 0.19441761495525278, 7.213126372469... 3 [0.023873798694476207, 0.0, 0.1866693496827619... 4 [1.8979669962108585, 0.0, 0.3192169213385746, ... 
6
  • 1
    Can you tell us explicitly what input you have, output you want, and output you're getting as of now ? It helps us understand your problem better (and faster). Usually, a minimal reproducible example is required. For instance, I am not sure what values you do have in your column "all_features", and so I can't know for sure what results the use of .values.tolist() Commented Sep 27, 2018 at 22:03
  • Did you try rdd.map(lambda x: (x, )).toDF( ) as given in the link you specified? That usually works. Commented Sep 28, 2018 at 10:11
  • @IMCoins Apologies.I have added the output and expected output now Commented Sep 28, 2018 at 14:27
  • @mayankagrawal I tried rdd.map(lambda x: (x, )).toDF( ) Commented Sep 28, 2018 at 14:28
  • @mayankagrawal it was again returning just one column named "all_features". I then tried converting it to Pandas DF and did the .values.tolist() which gave just one column with values separated by commas. Commented Sep 28, 2018 at 14:35

1 Answer 1

6

Since you want all the features in separate columns (as I got from your EDIT), the link to the answer you provided is not your solution.

Try this,

#column_names temp = temp.rdd.map(lambda x:[float(y) for y in x['all_features']]).toDF(column_names) 

EDIT:

Since your temp is originally a dataframe, you can also use this method without converting it to rdd,

import pyspark.sql.functions as F from pyspark.sql.types import * splits = [F.udf(lambda val: float(val[i].item()),FloatType()) for i in range(200)] temp = temp.select(*[s(F.col('all_features')).alias(c) for c,s in zip(column_names,splits)]) temp.show() 
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you Mayank! Tried your first solution and it worked great! Is there a way I can assign column names to the newly created dataframe using a list of column names that I already have?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.