1

I have retrieved a table from SQL Server which contains over 3 million records.

Top 10 Records:

+---------+-------------+----------+ |ACCOUNTNO|VEHICLENUMBER|CUSTOMERID| +---------+-------------+----------+ | 10003014| MH43AJ411| 20000000| | 10003014| MH43AJ411| 20000001| | 10003015| MH12GZ3392| 20000002| | 10003016| GJ15Z8173| 20000003| | 10003018| MH05AM902| 20000004| | 10003019| GJ15CD7657| 20001866| | 10003019| MH02BY7774| 20000005| | 10003019| MH02DG7774| 20000933| | 10003019| GJ15CA7387| 20001865| | 10003019| GJ15CB9601| 20001557| +---------+-------------+----------+ only showing top 10 rows 

Here ACCOUNTNO is unique, same ACCOUNTNO might have more than one VEHICLENUMBER, for each Vehicle we might have unique CUSTOMERID with respect to that VEHICLENUMBER

I want to export as a JSON format.

This is my code to achieve the output:

package com.issuer.pack2.spark import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql._ object sqltojson { def main(args:Array[String]) { System.setProperty("hadoop.home.dir", "C:/winutil/") val conf = new SparkConf().setAppName("SQLtoJSON").setMaster("local[*]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val jdbcSqlConnStr = "jdbc:sqlserver://192.168.70.88;databaseName=ISSUER;user=bhaskar;password=welcome123;" val jdbcDbTable = "[HISTORY].[TP_CUSTOMER_PREPAIDACCOUNTS]" val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> jdbcSqlConnStr,"dbtable" -> jdbcDbTable)).load() // jdbcDF.show(10) jdbcDF.registerTempTable("tp_customer_account") val res01 = sqlContext.sql("SELECT ACCOUNTNO, VEHICLENUMBER, CUSTOMERID FROM tp_customer_account GROUP BY ACCOUNTNO, VEHICLENUMBER, CUSTOMERID ORDER BY ACCOUNTNO ") // res01.show(10) res01.coalesce(1).write.json("D:/res01.json") } } 

The output I got:

{"ACCOUNTNO":10003014,"VEHICLENUMBER":"MH43AJ411","CUSTOMERID":20000001} {"ACCOUNTNO":10003014,"VEHICLENUMBER":"MH43AJ411","CUSTOMERID":20000000} {"ACCOUNTNO":10003015,"VEHICLENUMBER":"MH12GZ3392","CUSTOMERID":20000002} {"ACCOUNTNO":10003016,"VEHICLENUMBER":"GJ15Z8173","CUSTOMERID":20000003} {"ACCOUNTNO":10003018,"VEHICLENUMBER":"MH05AM902","CUSTOMERID":20000004} {"ACCOUNTNO":10003019,"VEHICLENUMBER":"MH02BY7774","CUSTOMERID":20000005} {"ACCOUNTNO":10003019,"VEHICLENUMBER":"GJ15CA7387","CUSTOMERID":20001865} {"ACCOUNTNO":10003019,"VEHICLENUMBER":"GJ15CD7657","CUSTOMERID":20001866} {"ACCOUNTNO":10003019,"VEHICLENUMBER":"MH02DG7774","CUSTOMERID":20000933} {"ACCOUNTNO":10003019,"VEHICLENUMBER":"GJ15CB9601","CUSTOMERID":20001557} {"ACCOUNTNO":10003019,"VEHICLENUMBER":"GJ15CD7387","CUSTOMERID":20029961} {"ACCOUNTNO":10003019,"VEHICLENUMBER":"GJ15CF7747","CUSTOMERID":20009020} {"ACCOUNTNO":10003019,"VEHICLENUMBER":"GJ15CB727","CUSTOMERID":20000008} {"ACCOUNTNO":10003019,"VEHICLENUMBER":"GJ15CA7837","CUSTOMERID":20001223} {"ACCOUNTNO":10003019,"VEHICLENUMBER":"GJ15CD7477","CUSTOMERID":20001690} {"ACCOUNTNO":10003020,"VEHICLENUMBER":"MH01AX5658","CUSTOMERID":20000006} {"ACCOUNTNO":10003021,"VEHICLENUMBER":"GJ15AD727","CUSTOMERID":20000007} {"ACCOUNTNO":10003023,"VEHICLENUMBER":"GU15PP7567","CUSTOMERID":20000009} {"ACCOUNTNO":10003024,"VEHICLENUMBER":"GJ15CA7567","CUSTOMERID":20000010} {"ACCOUNTNO":10003025,"VEHICLENUMBER":"GJ5JB9312","CUSTOMERID":20000011} 

But I want to get the JSON format output like this: I have written the JSON below manually (maybe I have designed wrongly, I want that the ACCOUNTNO should be unique) for first three records of my above table.

{ "ACCOUNTNO":10003014, "VEHICLE": [ { "VEHICLENUMBER":"MH43AJ411", "CUSTOMERID":20000000}, { "VEHICLENUMBER":"MH43AJ411", "CUSTOMERID":20000001} ], "ACCOUNTNO":10003015, "VEHICLE": [ { "VEHICLENUMBER":"MH12GZ3392", "CUSTOMERID":20000002} ] } 

So, how to achieve this JSON format using Spark code?

1 Answer 1

1

Scala spark-sql

You can do the following (instead of registerTempTable you can usecreateOrReplaceTempView as registerTempTable is deprecated)

jdbcDF.createGlobalTempView("tp_customer_account") val res01 = sqlContext.sql("SELECT ACCOUNTNO, collect_list(struct(`VEHICLENUMBER`, `CUSTOMERID`)) as VEHICLE FROM tp_customer_account GROUP BY ACCOUNTNO ORDER BY ACCOUNTNO ") res01.coalesce(1).write.json("D:/res01.json") 

You should get your desired output as

{"ACCOUNTNO":"10003014","VEHICLE":[{"VEHICLENUMBER":"MH43AJ411","CUSTOMERID":"20000000"},{"VEHICLENUMBER":"MH43AJ411","CUSTOMERID":"20000001"}]} {"ACCOUNTNO":"10003015","VEHICLE":[{"VEHICLENUMBER":"MH12GZ3392","CUSTOMERID":"20000002"}]} {"ACCOUNTNO":"10003016","VEHICLE":[{"VEHICLENUMBER":"GJ15Z8173","CUSTOMERID":"20000003"}]} {"ACCOUNTNO":"10003018","VEHICLE":[{"VEHICLENUMBER":"MH05AM902","CUSTOMERID":"20000004"}]} {"ACCOUNTNO":"10003019","VEHICLE":[{"VEHICLENUMBER":"GJ15CD7657","CUSTOMERID":"20001866"},{"VEHICLENUMBER":"MH02BY7774","CUSTOMERID":"20000005"},{"VEHICLENUMBER":"MH02DG7774","CUSTOMERID":"20000933"},{"VEHICLENUMBER":"GJ15CA7387","CUSTOMERID":"20001865"},{"VEHICLENUMBER":"GJ15CB9601","CUSTOMERID":"20001557"}]} 

Scala spark API

Using spark scala API, you can do the following:

import org.apache.spark.sql.functions._ val res01 = jdbcDF.groupBy("ACCOUNTNO") .agg(collect_list(struct("VEHICLENUMBER", "CUSTOMERID")).as("VEHICLE")) res01.coalesce(1).write.json("D:/res01.json") 

You should be getting the same answer as the sql way.

I hope the answer is helpful.

Sign up to request clarification or add additional context in comments.

8 Comments

What are the Scala API s, functions are available? I want to learn and practice these various API and functions, give me an important list of Scala API to work with XML, JSON etc semi structured data
here are some links scala spark functions, and spark xml and spark json and more just search the web and you will find abundant infos :) and of course you need a lot of practice
You can help me by giving some links too. Do you have any use cases using these collect_list, struct
collect_list and struct are inbuilt functions of spark. you can check the scala spark functions link in my above comment
When I am running the spark-sql query you have given, it is throwing error in eclipse: Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: tp_customer_account; line 1 pos 86
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.