6

Is it possible to create a table on spark using a select statement?

I do the following

import findspark findspark.init() import pyspark from pyspark.sql import SQLContext sc = pyspark.SparkContext() sqlCtx = SQLContext(sc) spark_df = sqlCtx.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data/documents_topics.csv") spark_df.registerTempTable("my_table") sqlCtx.sql("CREATE TABLE my_table_2 AS SELECT * from my_table") 

but I get the error

/Users/user/anaconda/bin/python /Users/user/workspace/Outbrain-Click-Prediction/test.py Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 17/01/21 17:19:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Traceback (most recent call last): File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o19.sql. : org.apache.spark.sql.AnalysisException: unresolved operator 'CreateHiveTableAsSelectLogicalPlan CatalogTable( Table: my_table_2 Created: Sat Jan 21 17:19:53 EST 2017 Last Access: Wed Dec 31 18:59:59 EST 1969 Type: MANAGED Storage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false;; 'CreateHiveTableAsSelectLogicalPlan CatalogTable( Table: my_table_2 Created: Sat Jan 21 17:19:53 EST 2017 Last Access: Wed Dec 31 18:59:59 EST 1969 Type: MANAGED Storage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false : +- Project [document_id#0, topic_id#1, confidence_level#2] : +- SubqueryAlias my_table : +- Relation[document_id#0,topic_id#1,confidence_level#2] csv

at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:374) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/Users/user/workspace/Outbrain-Click-Prediction/test.py", line 16, in sqlCtx.sql("CREATE TABLE my_table_2 AS SELECT * from my_table") File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/context.py", line 360, in sql return self.sparkSession.sql(sqlQuery) File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/session.py", line 543, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in call File "/Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: "unresolved operator 'CreateHiveTableAsSelectLogicalPlan CatalogTable(\n\tTable: my_table_2\n\tCreated: Sat Jan 21 17:19:53 EST 2017\n\tLast Access: Wed Dec 31 18:59:59 EST 1969\n\tType: MANAGED\n\tStorage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false;;\n'CreateHiveTableAsSelectLogicalPlan CatalogTable(\n\tTable: my_table_2\n\tCreated: Sat Jan 21 17:19:53 EST 2017\n\tLast Access: Wed Dec 31 18:59:59 EST 1969\n\tType: MANAGED\n\tStorage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false\n: +- Project [document_id#0, topic_id#1, confidence_level#2]\n: +- SubqueryAlias my_table\n: +- Relation[document_id#0,topic_id#1,confidence_level#2] csv\n"

2 Answers 2

10

I've corrected this issue by using HiveContext instead of SQLContext as below:

import findspark findspark.init() import pyspark from pyspark.sql import HiveContext sqlCtx= HiveContext(sc) spark_df = sqlCtx.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data/documents_topics.csv") spark_df.registerTempTable("my_table") sqlCtx.sql("CREATE TABLE my_table_2 AS SELECT * from my_table") 
Sign up to request clarification or add additional context in comments.

3 Comments

It's worth noting, now, that HiveContext() is deprecated as of Spark 2.2. The correct answer now uses SparkSession.enableHiveSupport().getOrCreate() instead.
Is there a way of doing this in a more native PySpark manner, e.g. spark_df.registerPersistentTable('my_table_2') -- I know registerPersistentTable is not an actual method but something similar?
here one problem, data also copy with schema, is there any way to copy only schema
0

You should first do a select and assign it to a dataframe variable and then register it with registerTempTable as you do with the dataframe created from the CSV file

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.