2

I have a CSV with headings that I'd like to save as Parquet (actually a delta table)

The column headings have spaces in them, which parquet can't handle. How do I change spaces to underscores?

This is what I have so far, cobbled together from other SO posts:

from pyspark.sql.functions import * df = spark.read.option("header", True).option("delimiter","\u0001").option("inferSchema",True).csv("/mnt/landing/MyFile.TXT") names = df.schema.names for name in names: df2 = df.withColumnRenamed(name,regexp_replace(name, ' ', '_')) 

When I run this, the final line gives me this error:

TypeError: Column is not iterable

I thought this would be a common requirement given that parquet can't handle spaces but it's quite difficult to find any examples.

1
  • can you try with select: df.select([col(a).alias(b) for a,b in zip(df.columns,[re.sub(" ","_",i) for i in df.columns])]) Commented Jun 24, 2020 at 12:47

3 Answers 3

1

You need to use reduce function to iteratively apply renaming to the dataframe, because in your code df2 will have only the last column renamed...

The code would look as following (instead of for loop):

df2 = reduce(lambda data, name: data.withColumnRenamed(name, name.replace('1', '2')), names, df) 
Sign up to request clarification or add additional context in comments.

6 Comments

Great. I just had to add from functools import reduce to the top of this. So far this is doing what I want - I'll just check out some of the others also
In this case is the reduce function accepting three parameters: the lambda function, names, and df?
and it seems like names is passed to the name parameter and df is passed to the data parameter? Trying to understand what's going on here
All the examples of reduce that I see take two parameters?
OK so there are some good examples here showing both ways of doing it (reduce and loop) medium.com/@mrpowers/…
|
1

You are getting exception because - function regexp_replace returns of type Column but function withColumnRenamed is excepting of type String.

def regexp_replace(e: org.apache.spark.sql.Column,pattern: String,replacement: String): org.apache.spark.sql.Column 
def withColumnRenamed(existingName: String,newName: String): org.apache.spark.sql.DataFrame 

Comments

0

Use .toDF (or) .select and pass list of columns to create new dataframe.

df.show() #+---+----+----+ #| id|id a|id b| #+---+----+----+ #| 1| a| b| #| 2| c| d| #+---+----+----+ new_cols=list(map(lambda x: x.replace(" ", "_"), df.columns)) df.toDF(*new_cols).show() df.select([col(s).alias(s.replace(' ','_')) for s in df.columns]).show() #+---+----+----+ #| id|id_a|id_b| #+---+----+----+ #| 1| a| b| #| 2| c| d| #+---+----+----+ 

1 Comment

Thanks for your input. I haven't tried your answer but I'm sure I'll come back to it.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.