7

Supposing, I have a json file with lines in follow structure:

{ "a": 1, "b": { "bb1": 1, "bb2": 2 } } 

I want to change the value of key bb1 or add a new key, like: bb3. Currently, I use spark.read.json to load the json file into spark as DataFrame and df.rdd.map to map each row of RDD to dict. Then, change nested key value or add a nested key and convert the dict to row. Finally, convert RDD to DataFrame. The workflow works as follow:

def map_func(row): dictionary = row.asDict(True) adding new key or changing key value return as_row(dictionary) # as_row convert dict to row recursively df = spark.read.json("json_file") df.rdd.map(map_func).toDF().write.json("new_json_file") 

This could work for me. But I concern that converting DataFrame -> RDD ( Row -> dict -> Row) -> DataFrame would kill the efficiency. Is there any other methods that could work for this demand but not at the cost of efficiency?


The final solution that I used is using withColumn and dynamically building the schema of b. Firstly, we can get the b_schema from df schema by:

b_schema = next(field['type'] for field in df.schema.jsonValue()['fields'] if field['name'] == 'b') 

After that, b_schema is dict and we can add new field into it by:

b_schema['fields'].append({"metadata":{},"type":"string","name":"bb3","nullable":True}) 

And then, we could convert it to StructType by:

new_b = StructType.fromJson(b_schema) 

In the map_func, we could convert Row to dict and populate the new field:

def map_func(row): data = row.asDict(True) data['bb3'] = data['bb1'] + data['bb2'] return data map_udf = udf(map_func, new_b) df.withColumn('b', map_udf('b')).collect() 

Thanks @Mariusz

1 Answer 1

9

You can use map_func as udf and therefore omit converting DF -> RDD -> DF, still having the flexibility of python to implement business logic. All you need is to create schema object:

>>> from pyspark.sql.types import * >>> new_b = StructType([StructField('bb1', LongType()), StructField('bb2', LongType()), StructField('bb3', LongType())]) 

Then you define map_func and udf:

>>> from pyspark.sql.functions import * >>> def map_func(data): ... return {'bb1': 4, 'bb2': 5, 'bb3': 6} ... >>> map_udf = udf(map_func, new_b) 

Finally apply this UDF to dataframe:

>>> df = spark.read.json('sample.json') >>> df.withColumn('b', map_udf('b')).first() Row(a=1, b=Row(bb1=4, bb2=5, bb3=6)) 

EDIT:

According to the comment: You can add a field to existing StructType in a easier way, for example:

>>> df = spark.read.json('sample.json') >>> new_b = df.schema['b'].dataType.add(StructField('bb3', LongType())) 
Sign up to request clarification or add additional context in comments.

5 Comments

Seems, the map_func would get a Row. How can I modify the Row b. And I want to set the sum of bb1 and bb2 as the value of new column bb3
Column b will be modified (replaced) by withColumn('b', something_here). Inside map_func you can access all variables of b, for example to sum bb1 and bb2 udf can look as follows: map_udf = udf(lambda data: {'bb1': data.bb1, 'bb2': data.bb2, 'bb3': data.bb1 + data.bb2}, new_b)
Thanks! However, can we not to hardcode the schema new_b and dict object since when data structure is changed we need to change the code? Can we infer the schema from column 'b' and dynamically build the dict object?
Your solution is much better. Didn't find that we can get a StructType from StructField with dataType from pyspark docs
This is the first time I've seen a Row within a Row, and it solved my problem with pyspark inferring the wrong type

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.