4

I am trying to store Stream Data into HDFS using SparkStreaming,but it Keep creating in new file insted of appending into one single file or few multiple files

If it keep creating n numbers of files,i feel it won't be much efficient

HDFS FILE SYSYTEM enter image description here

Code

lines.foreachRDD(f => { if (!f.isEmpty()) { val df = f.toDF().coalesce(1) df.write.mode(SaveMode.Append).json("hdfs://localhost:9000/MT9") } }) 

In my pom I am using respective dependencies:

  • spark-core_2.11
  • spark-sql_2.11
  • spark-streaming_2.11
  • spark-streaming-kafka-0-10_2.11
2
  • If you're reading data from Kafka into HDFS, I suggest you look at using Nifi or Kafka Connect. Don't rewrite code for existing solutions Commented Jun 25, 2018 at 11:14
  • hdfs is meant to be write once and read many times,you cannot be able to write to same file .in order to do that you had to do compaction kind of process which hive and hbase follows Commented Jun 25, 2018 at 13:00

2 Answers 2

6

As you already realized Append in Spark means write-to-existing-directory not append-to-file.

This is intentional and desired behavior (think what would happen if process failed in the middle of "appending" even if format and file system allow that).

Operations like merging files should be applied by a separate process, if necessary at all, which ensures correctness and fault tolerance. Unfortunately this requires a full copy which, for obvious reasons is not desired on batch-to-batch basis.

Sign up to request clarification or add additional context in comments.

9 Comments

you can go through this link : spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/… Append mode means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
@andani That's appending in Spark... For HDFS, appending means adding new files into a directory rather than overwriting that directory completely
@cricket_007 then is their any way to store data in same file as it is their in Storm
@andani I have never used Storm, but I known it isn't used for persistent data storage
@cricket_007 what i ment to say is their built in libraries which stores data in hdfs in required fashion.
|
-1

It’s creating file for each rdd as every time you are reinitialising the DataFrame variable. I would suggest have a DataFrame variable and assign as null outside of loop and inside each rdd union with the local DataFrame. After the loop write using the outer DataFrame.

4 Comments

still the same case
var empty = sqlContext.emptyDataFrame lines.foreachRDD(f => { if (!f.isEmpty()) { empty = f.toDF().coalesce(1) empty.write.mode(SaveMode.Append).json(warehouseLocation) } })
Inside your condition add this if(empty == null) empty = f.toDF() else empty = empty.union(f.toDF()) after the loop ends empty.coalesce(1).write.mode rest your option. Please do not write it inside the loop
df doesnot have fiexd no. coloum, so getting error for your condition.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.