I am trying to built a large amount of random forest models by group using Spark. My approach is to cache a large input data file, split it into pieces based on the school_id, cache the individual school input file in memory, run a model on each of them, and then extract the label and predictions.
model_input.cache() val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq) val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID).cache) import org.apache.spark.sql.DataFrame import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.{Pipeline, PipelineModel} def trainModel(df: DataFrame): PipelineModel = { val rf = new RandomForestClassifier() //omit some parameters val pipeline = new Pipeline().setStages(Array(rf)) pipeline.fit(df) } val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df)) val preds = (0 to schools.length -1).map(i => bySchoolArrayModels(i).transform(bySchoolArray(i)).select("prediction", "label") preds.write.format("com.databricks.spark.csv"). option("header","true"). save("predictions/pred"+schools(i)) The code works fine on a small subset but it takes longer than I expected. It seems to me every time I run an individual model, Spark reads the entire file and it takes forever to complete all the model runs. I was wondering whether I did not cache the files correctly or anything went wrong with the way I code it.
Any suggestions would be useful. Thanks!