Supposing, I have a json file with lines in follow structure:
{ "a": 1, "b": { "bb1": 1, "bb2": 2 } } I want to change the value of key bb1 or add a new key, like: bb3. Currently, I use spark.read.json to load the json file into spark as DataFrame and df.rdd.map to map each row of RDD to dict. Then, change nested key value or add a nested key and convert the dict to row. Finally, convert RDD to DataFrame. The workflow works as follow:
def map_func(row): dictionary = row.asDict(True) adding new key or changing key value return as_row(dictionary) # as_row convert dict to row recursively df = spark.read.json("json_file") df.rdd.map(map_func).toDF().write.json("new_json_file") This could work for me. But I concern that converting DataFrame -> RDD ( Row -> dict -> Row) -> DataFrame would kill the efficiency. Is there any other methods that could work for this demand but not at the cost of efficiency?
The final solution that I used is using withColumn and dynamically building the schema of b. Firstly, we can get the b_schema from df schema by:
b_schema = next(field['type'] for field in df.schema.jsonValue()['fields'] if field['name'] == 'b') After that, b_schema is dict and we can add new field into it by:
b_schema['fields'].append({"metadata":{},"type":"string","name":"bb3","nullable":True}) And then, we could convert it to StructType by:
new_b = StructType.fromJson(b_schema) In the map_func, we could convert Row to dict and populate the new field:
def map_func(row): data = row.asDict(True) data['bb3'] = data['bb1'] + data['bb2'] return data map_udf = udf(map_func, new_b) df.withColumn('b', map_udf('b')).collect() Thanks @Mariusz