0

I am trying to built a large amount of random forest models by group using Spark. My approach is to cache a large input data file, split it into pieces based on the school_id, cache the individual school input file in memory, run a model on each of them, and then extract the label and predictions.

model_input.cache() val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq) val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID).cache) import org.apache.spark.sql.DataFrame import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.{Pipeline, PipelineModel} def trainModel(df: DataFrame): PipelineModel = { val rf = new RandomForestClassifier() //omit some parameters val pipeline = new Pipeline().setStages(Array(rf)) pipeline.fit(df) } val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df)) val preds = (0 to schools.length -1).map(i => bySchoolArrayModels(i).transform(bySchoolArray(i)).select("prediction", "label") preds.write.format("com.databricks.spark.csv"). option("header","true"). save("predictions/pred"+schools(i)) 

The code works fine on a small subset but it takes longer than I expected. It seems to me every time I run an individual model, Spark reads the entire file and it takes forever to complete all the model runs. I was wondering whether I did not cache the files correctly or anything went wrong with the way I code it.

Any suggestions would be useful. Thanks!

1 Answer 1

3

rdd's methods are immutable, so rdd.cache() returns a new rdd. So you need to assign the cachedRdd to an other variable and then re-use that. Otherwise your are not using the cached rdd.

val cachedModelInput = model_input.cache() val schools = cachedModelInput.select("School_ID").distinct.collect.flatMap(_.toSeq) .... 
Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for your answer! Is there a way to cache the the list of model inputs by school(bySchoolArray) and the list of models that stored in bySchoplArrayModels?
bySchoolArray in your code if cached in the right way. Think about that caching as effect only after an action, it's a lazy caching.
in order to cache models, you can do something like this: val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df).cache())
rdd.cache doesn't return a new rdd but an RDD.this.type

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.