0

I am new to spark and scala. I want to read a directory containing json files. The file has attribute called "EVENT_NAME" which can have 20 different values. I need to separate the events, depending upon the attribute value. i.e. EVENT_NAME=event_A events together. Write these in hive external table structure like: /apps/hive/warehouse/db/event_A/dt=date/hour=hr

Here I have 20 different tables for all the event types and data related to each event should go to respective table. I have managed to write some code but need help to write my data correctly.

{ import org.apache.spark.sql._ import sqlContext._ val path = "/source/data/path" val trafficRep = sc.textFile(path) val trafficRepDf = sqlContext.read.json(trafficRep) trafficRepDf.registerTempTable("trafficRepDf") trafficRepDf.write.partitionBy("EVENT_NAME").save("/apps/hive/warehouse/db/sample") } 

The last line creates a partitioned output but is not how exactly I need it. Please suggest how can I get it correct or any other piece of code to do it.

4 Answers 4

3

I'm assuming you mean you'd like to save the data into separate directories, without using Spark/Hive's {column}={value} format.

You won't be able to use Spark's partitionBy, as Spark partitioning forces you to use that format.

Instead, you have to break your DataFrame into its component partitions, and save them one by one, like so:

{ import org.apache.spark.sql._ import sqlContext._ val path = "/source/data/path" val trafficRep = sc.textFile(path) val trafficRepDf = sqlContext.read.json(trafficRep) val eventNames = trafficRepDf.select($"EVENT_NAME").distinct().collect() // Or if you already know what all 20 values are, just hardcode them. for (eventName <- eventNames) { val trafficRepByEventDf = trafficRepDef.where($"EVENT_NAME" === eventName) trafficRepByEventDf.write.save(s"/apps/hive/warehouse/db/sample/${eventName}") } } 
Sign up to request clarification or add additional context in comments.

Comments

1

You can add columns with date and hour into your dataframe.

import org.apache.spark.sql._ import sqlContext._ val path = "/source/data/path" val trafficRep = sc.textFile(path) val trafficRepDf = sqlContext.read.json(trafficRep) trafficRepDf.withColumn("dt", lit("dtValue")).withColumn("hour", lit("hourValue")) trafficRepDf.write.partitionBy("EVENT_NAME","dt","hour").save("/apps/hive/warehouse/db/sample") 

Comments

0

I assume you want a table structure like /apps/hive/warehouse/db/EVENT_NAME=xx/dt=yy/hour=zz, then you need to partition by EVENT_NAME, dt and hour, so try this:

trafficRepDf.write.partitionBy("EVENT_NAME","dt","hour").save("/apps/hive/warehouse/db/sample") 

1 Comment

the data doesn't have date and time information in it. I need to provide it externally.
0

https://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-16-to-20

Dataset and DataFrame API registerTempTable has been deprecated and replaced by createOrReplaceTempView

https://spark.apache.org/docs/2.1.1/api/scala/index.html#org.apache.spark.sql.DataFrameWriter@saveAsTable(tableName:String):Unit

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.