7

When I try to read parquet folder, that is currently being written with another spark streaming job, using an option "mergeSchema":"true", I get an Error:

java.io.IOException: Could not read footer for file 
val df = spark .read .option("mergeSchema", "true") .parquet("path.parquet") 

Without schema merging I can read the folder nicely but is it possible to read such a folder with schema merging regardless of possible side jobs updating it?

Full exception:

java.io.IOException: Could not read footer for file: FileStatus{path=hdfs://path.parquet/part-00000-20199ef6-4ff8-4ee0-93cc-79d47d2da37d-c000.snappy.parquet; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:551) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:538) at org.apache.spark.util.ThreadUtils$$anonfun$3$$anonfun$apply$1.apply(ThreadUtils.scala:287) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: hdfs://path.parquet/part-00000-20199ef6-4ff8-4ee0-93cc-79d47d2da37d-c000.snappy.parquet is not a Parquet file (too small length: 0) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544) ... 9 more 
7
  • What do you need mergeSchema for? Commented Jul 29, 2019 at 17:56
  • @JacekLaskowski to combine parquet files with different field sets because of schema evolution Commented Jul 30, 2019 at 7:41
  • What is the output format of the streaming job in "written with another spark streaming job,"? Can you include the entire exception? Is the streaming job up and running while you're trying to read the files in a batch job? Commented Jul 30, 2019 at 13:27
  • Just checked your use case with a sample streaming query and a batch query and all worked fine. How do you start the streaming query? How do you start the batch job? Commented Jul 30, 2019 at 14:08
  • 1
    It looks like Spark ignores incomplete files without schema merging, but with this option enabled it tries to use incomplete files somehow resulting with exception Commented Aug 13, 2019 at 8:11

1 Answer 1

12
+50

Run following before creating your dataframe:

spark.sql("set spark.sql.files.ignoreCorruptFiles=true")

i.e. Enable this config - spark.sql.files.ignoreCorruptFiles

As stated here, If this config is true, the Spark jobs will continue to run when encountering corrupted or non-existing files and contents that have been read will still be returned. Also, this config is used by the merge schema flow.

It is available from Spark 2.1.1+

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.