0

I have the following DataFrames and a join operation between them, however the join is failing without citing any actual error.

//HospitalFacility class to fill in case class HospitalFacility(Name: String, Rating: Int, Cost: Int); //I pass the pid as an input parameter. //hc : HiveConext successfully created //Provider_Facility & Facility_Master are my two hive tables. def fetchHospitalFacilityData(pid: String): String = { val filteredProviderSpecilaityDF = hc.sql("select FacilityId, Rating, Cost from Provider_Facility where ProviderId='" + pid + "'"); println(filteredProviderSpecilaityDF); filteredProviderSpecilaityDF.foreach ( println ); //Prints perfectly val allFacilityDF = hc.sql("select id, Name from Facility_Master"); println(allFacilityDF); allFacilityDF.foreach(println); //Prints perfectly //The below line throws error. val resultDF = filteredProviderSpecilaityDF.join(allFacilityDF,filteredProviderSpecilaityDF("FacilityId") === allFacilityDF("id") ,"right_outer"); println(resultDF); val filteredFacilityList = resultDF.rdd.map { spec => HospitalFacility(spec.getString(0).toString(), spec.getInt(3), spec.getInt(4)) }.collect(); filteredFacilityList.foreach(println); //does not reach this point return result; } 

The error that is thrown is listed below :

 Exception in thread "broadcast-hash-join-0" java.lang.NoSuchMethodError: org.apache.spark.util.Utils$.tryOrIOException(Lscala/Function0;)V at org.apache.spark.sql.execution.joins.UnsafeHashedRelation.writeExternal(HashedRelation.scala:264) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:85) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1326) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:94) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:82) at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:100) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashOuterJoin.scala:82) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashOuterJoin.scala:82) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 

Can anyone please help me.

6
  • 1
    Check your scala/spark libraries and cluster versions ! You seem to have a mismatch of a kind Commented Mar 27, 2017 at 13:12
  • This is working from the Spark-shell but not from the Scala program Commented Mar 27, 2017 at 16:02
  • 1
    Can you create a minimum reproducible and verifiable example ? (Build file, code, spark version, how do you run it, etc) We won't be able to help you otherwise Commented Mar 27, 2017 at 16:05
  • Hi eliasah, I have provided the whole function. Can you please have a look now. Thanks. Commented Mar 27, 2017 at 19:20
  • Spark-hive :1.6.0, Spark-sql :1.6.0, Spark-core:1.6.3. I run them through Eclipse through a servlet. Commented Mar 27, 2017 at 21:29

1 Answer 1

0

Maybe allFacilityDF("id")=== filteredProviderSpecilaityDF("FacilityId") return a boolean seq not a Seq[String] The parm usingColumns is defined below: Names of the columns to join on. This columns must exist on both sides.

Sign up to request clarification or add additional context in comments.

1 Comment

I just ran it in my Spark-shell and its working fine. But when I run it through the servlet its throwing the mentioned error. Anything that could go wrong if it is called from a scala program?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.