4

in my project, I'm transferring data from MongoDB to SparkSQL table for SQL-based queries. But Spark SQL let me to create temporary files. When I want to query something, execution time is very high, because data transferring and mapping operation takes too much time.

So, can I reduce execution time? Can I create permanent Spark SQL tables? Can I query permanent tables with JDBC?

I'm adding my code and execution time results. I'm doing everything on standalone mode.

package com.mongodb.spark.sql; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.bson.BSONObject; import com.mongodb.hadoop.MongoInputFormat; import com.mongodb.spark.demo.Observation; import com.mongodb.spark.demo.Sensor; import scala.Tuple2; public class SparkSqlMongo { public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat"); conf.set("mongo.input.uri", "mongodb://localhost:27017/test.observations"); Configuration sensConf = new Configuration(); sensConf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat"); sensConf.set("mongo.input.uri", "mongodb://localhost:27017/test.sens"); SparkConf sconf = new SparkConf().setMaster("local[2]").setAppName("SQL DENEME").set("nsmc.connection.host", "mongodb:"); JavaSparkContext sc = new JavaSparkContext(sconf); SQLContext sql = new SQLContext(sc); JavaRDD<Observation> obs = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class) .map(new Function<Tuple2<Object, BSONObject>, Observation>() { private static final long serialVersionUID = 1L; @Override public Observation call(Tuple2<Object, BSONObject> v1) throws Exception { int id = (int) v1._2.get("_id"); double value = (double) v1._2.get("Value"); // Date time = (Date) v1._2.get("Time"); int sensor = (int) v1._2.get("SensorId"); int stream = (int) v1._2.get("DataStreamId"); Observation obs = new Observation(id, value, sensor, stream); return obs; } }); DataFrame obsi = sql.createDataFrame(obs, Observation.class); obsi.registerTempTable("obsi"); JavaRDD<Sensor> sens = sc.newAPIHadoopRDD(sensConf, MongoInputFormat.class, Object.class, BSONObject.class) .map(new Function<Tuple2<Object, BSONObject>, Sensor>() { /** * */ private static final long serialVersionUID = 1L; @Override public Sensor call(Tuple2<Object, BSONObject> v1) throws Exception { int id = (int) v1._2.get("_id"); String name = (String) v1._2.get("Name"); String description = (String) v1._2.get("Description"); Sensor s = new Sensor(id, name, description); System.out.println(s.getName()); return s; } }); DataFrame sensi = sql.createDataFrame(sens, Sensor.class); sensi.registerTempTable("sensi"); sensi.show(); long start = System.currentTimeMillis(); DataFrame obser = sql .sql("SELECT obsi.value, obsi.id, sensi.name FROM obsi, sensi WHERE obsi.sensorID = sensi.id and sensi.id = 107") .cache(); long stop = System.currentTimeMillis(); // System.out.println("count ====>>> " + a.toString()); System.out.println("toplam sorgu zamani : " + (stop - start)); ; // // while(!obser.equals(null)){ // System.out.println(obser); // } List<String> names = obser.javaRDD().map(new Function<Row, String>() { private static final long serialVersionUID = 1L; public String call(Row row) { // System.out.println(row); // System.out.println("value : " + row.getDouble(0) + " id : " + // row.getInt(1) + " name : " + row.getString(0)); return "Name: " + row; } }).collect(); } 

}

All execution time is about 120 seconds for about 5M observation and 1K sns data. I join these tables and this execution time is very high and unacceptable.

2 Answers 2

6
  1. Yes, you can improve program execution time by Caching your Table, Dataframe or Rdd.
  2. And, if you want to save you data as a permanent table than you can use df.saveAsTable method but dataframe should be created through HiveContext.
  3. For the JDBC connection you need to start your Thrift service then you can perform Spark Sql on registers tables.
Sign up to request clarification or add additional context in comments.

1 Comment

thanks for answers. I researched Hive and query performance didn't satisfy me. So, I'm looking more effective ways. At last, I used parquet files to store data. When I query, I fetch data from parquet query and this way improves query performance about 4-5 times.
2

Spark SQL is not a database data manipulations that occur within it only live as long as the spark context that created them is available. There are several spark job server implementations that will enable you to hold the result of one job and send other jobs against the same dataset. It will still be transient and have to be reloaded if the server (i.e. the spark context) shutsdown

That said you can persist the outcome of a calculation and retrieve that later (either back in Mongo, into files on Hadoop/other filesystem)

1 Comment

thanks for answer. I understand that life cycle of Spark SQL table is limited with Spark Context life cycle. So, let me ask one more question: Can I reduce query execution time? I'm working on stand alone mode and later, we will use clusters. When we use clusters, query execution time will be decrease to acceptable execution time?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.