16

I'm wondering if there is a way to know the number of lines written by a Spark save operation. I know that it's enough to do a count on the RDD before writing it, but I'd like to know if there is a way to have the same info without doing it.

Thank you, Marco

3
  • Its may be duplicate of stackoverflow.com/questions/28413423/… Commented May 28, 2016 at 19:19
  • 2
    @amit_kumar I don't think it's a duplicate, I think he wants to count it and save it without having to pass over the data twice. Commented May 29, 2016 at 11:38
  • This is a question about RDDs, but if you're using DataFrames on Spark 3.3+ check out this answer (Python) or this answer (Scala). Commented Feb 15, 2024 at 19:32

3 Answers 3

18

If you really want you can add custom listener and extract number of written rows from outputMetrics. Very simple example can look like this:

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} var recordsWrittenCount = 0L sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { synchronized { recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten } } }) sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar") recordsWrittenCount // Long = 10 

but this part of the API is intended for internal usage.

Sign up to request clarification or add additional context in comments.

5 Comments

thank you, but using Spark 1.5.2 it doesn't work. Instead you have to do: recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.get.recordsWritten
Like mentioned it is an internal API so there is no guarantee it will be stable.
Wouldn't it be better to use atomic recordsWrittenCount instead of synch block?
is there a way to do the same thing when writing records using: spark.write.avro(...)
for me recordsWritten is always 0 in structuredstreaming
8

The accepted answer more closely matches the OPs specific needs (as made explicit in various comments), nevertheless this answer will suit the majority.

The most efficient approach is to use an Accumulator: http://spark.apache.org/docs/latest/programming-guide.html#accumulators

val accum = sc.accumulator(0L) data.map { x => accum += 1 x } .saveAsTextFile(path) val count = accum.value 

You can then wrap this in a useful pimp:

implicit class PimpedStringRDD(rdd: RDD[String]) { def saveAsTextFileAndCount(p: String): Long = { val accum = rdd.sparkContext.accumulator(0L) rdd.map { x => accum += 1 x } .saveAsTextFile(p) accum.value } } 

So you can do

val count = data.saveAsTextFileAndCount(path) 

6 Comments

I know this kind of approach, but I'd like to avoid it for 2 main reasons: using it in a transformation means that the result cannot be trusted in case of some failures; there is anyway a (little) overhead. I was just wondering if there is a counter accessible someway, like there is in mapreduce, since in the web UI the number of rows written is shown...
Well, thank you for your answer... even though I keep on wondering how they can show this info on the web UI if there is no internal counter...
@mark91 Ah, well, you could clone the UI code and dig through it I guess. Having read the documentation, the code I've given is fine. (Spark says it protects against restarted tasks). It seems what you want to protect against is when an RDD is transformed multiple times, but the code I've given the rdd isn't accessible outside the Pimps scope. It will only accumulates before writing, and only accumulate once.
count = rdd.count(); rdd.saveAsTextFile(p); Is this anyway better?
@amit_kumar If RDD is not cached this should be more efficient than separate count because data will be materialized only once.
|
2

If you look at

taskEnd.taskInfo.accumulables 

You will see that it is bundled with following AccumulableInfo in ListBuffer in a sequential order.

AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None), AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None), AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None), AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None), AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None), AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql) 

You can clearly see that number of output rows are on the 7th position of the listBuffer, so the correct way to get the rows being written count is

taskEnd.taskInfo.accumulables(6).value.get 

We can get the rows written by following way ( I just modified @zero323's answer)

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} var recordsWrittenCount = 0L sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { synchronized { recordsWrittenCount += taskEnd.taskInfo.accumulables(6).value.get.asInstanceOf[Long] } } }) sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar") recordsWrittenCount 

1 Comment

It is not that simple unfortunately. I had several tasks ie: stage1"s task outputted 100 rows , stage2's task(which writes into the DB) outputted 30rows. So just adding them up naively would result 130 rows instead of 30.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.