in my project, I'm transferring data from MongoDB to SparkSQL table for SQL-based queries. But Spark SQL let me to create temporary files. When I want to query something, execution time is very high, because data transferring and mapping operation takes too much time.
So, can I reduce execution time? Can I create permanent Spark SQL tables? Can I query permanent tables with JDBC?
I'm adding my code and execution time results. I'm doing everything on standalone mode.
package com.mongodb.spark.sql; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.bson.BSONObject; import com.mongodb.hadoop.MongoInputFormat; import com.mongodb.spark.demo.Observation; import com.mongodb.spark.demo.Sensor; import scala.Tuple2; public class SparkSqlMongo { public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat"); conf.set("mongo.input.uri", "mongodb://localhost:27017/test.observations"); Configuration sensConf = new Configuration(); sensConf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat"); sensConf.set("mongo.input.uri", "mongodb://localhost:27017/test.sens"); SparkConf sconf = new SparkConf().setMaster("local[2]").setAppName("SQL DENEME").set("nsmc.connection.host", "mongodb:"); JavaSparkContext sc = new JavaSparkContext(sconf); SQLContext sql = new SQLContext(sc); JavaRDD<Observation> obs = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class) .map(new Function<Tuple2<Object, BSONObject>, Observation>() { private static final long serialVersionUID = 1L; @Override public Observation call(Tuple2<Object, BSONObject> v1) throws Exception { int id = (int) v1._2.get("_id"); double value = (double) v1._2.get("Value"); // Date time = (Date) v1._2.get("Time"); int sensor = (int) v1._2.get("SensorId"); int stream = (int) v1._2.get("DataStreamId"); Observation obs = new Observation(id, value, sensor, stream); return obs; } }); DataFrame obsi = sql.createDataFrame(obs, Observation.class); obsi.registerTempTable("obsi"); JavaRDD<Sensor> sens = sc.newAPIHadoopRDD(sensConf, MongoInputFormat.class, Object.class, BSONObject.class) .map(new Function<Tuple2<Object, BSONObject>, Sensor>() { /** * */ private static final long serialVersionUID = 1L; @Override public Sensor call(Tuple2<Object, BSONObject> v1) throws Exception { int id = (int) v1._2.get("_id"); String name = (String) v1._2.get("Name"); String description = (String) v1._2.get("Description"); Sensor s = new Sensor(id, name, description); System.out.println(s.getName()); return s; } }); DataFrame sensi = sql.createDataFrame(sens, Sensor.class); sensi.registerTempTable("sensi"); sensi.show(); long start = System.currentTimeMillis(); DataFrame obser = sql .sql("SELECT obsi.value, obsi.id, sensi.name FROM obsi, sensi WHERE obsi.sensorID = sensi.id and sensi.id = 107") .cache(); long stop = System.currentTimeMillis(); // System.out.println("count ====>>> " + a.toString()); System.out.println("toplam sorgu zamani : " + (stop - start)); ; // // while(!obser.equals(null)){ // System.out.println(obser); // } List<String> names = obser.javaRDD().map(new Function<Row, String>() { private static final long serialVersionUID = 1L; public String call(Row row) { // System.out.println(row); // System.out.println("value : " + row.getDouble(0) + " id : " + // row.getInt(1) + " name : " + row.getString(0)); return "Name: " + row; } }).collect(); } }
All execution time is about 120 seconds for about 5M observation and 1K sns data. I join these tables and this execution time is very high and unacceptable.