I'm taking a design workshop and we've received a short task:
- To parse an input file of processor jobs.
- To simulate scheduling of those jobs on a parallel computer (each job needs X processors)
- The scheduling should be FCFS but it should be easy to to swap it out for a different scheduler.
I wrote the code in C# (in a few minutes) and decided this was a good opportunity to brush up on my now-two-years neglected Scala. Having done Haskell before, I feel that I'm not using Scala's capabilities. The whole code feels very stateful, and I'm sure there are things I'm complicating that could be much much simpler in idiomatic scala.
A bit about the structure: Since I'm simulating time but can't go through time since it's simulating a long time - I'm using a priority queue (on time) for my events. In order to avoid cases where I reschedule two events to the same time - I always promote unsuccessful schedules to the next done events. I also feel that the code could be quicker (takes 38 seconds on my laptop on the sample file).
I would really any improvement. To be fair this is a university assignment but asking on the internet on such sites was explicitly allowed.
import EventType.EventType import scala.collection.mutable import scala.io.Source object Main { def main(args: Array[String]) { val sw = System.currentTimeMillis() val lines = Source.fromFile( """c:\data\data.swf""").getLines() val maxProcessors = lines.find(_.contains("MaxProcs")).get.split(":\\s+")(1).toInt val data = lines.dropWhile(_.startsWith(";")).map(parseSWFJob) val scheduler = new FCFSScheduler(numProcessors = maxProcessors) data.map(scheduler.notify).flatMap(_ => scheduler.getDoneJobs).map(_.id).foreach(println) // pump scheduler.noMoreJobs() println(scheduler.getDoneJobs) // this writes to a file in the real code println("Time diff millis" + (System.currentTimeMillis() - sw)) } def parseSWFJob(str:String) = { val xs = str.trim.split("\\s+").map(_.trim) new Job(xs(0).toLong, xs(1).toLong, xs(3).toLong, None, xs(4).toLong) } } class Job(jobId:Long ,submittedTime:Long,jobRuntime:Long, jobStartTime: Option[Long], jobRequiredProcessors: Long){ val (id, submitTime, runtime, requiredProcessors) = (jobId, submittedTime, jobRuntime, jobRequiredProcessors) var startTime = jobStartTime override def toString = s"(id=$id,submit=$submitTime,run=$runtime,start=$startTime,procs=$requiredProcessors)" } object EventType extends Enumeration { type EventType = Value val Added, Retry, Done = Value } class FCFSScheduler(numProcessors:Long) extends JobScheduler { private val eventLoop = new mutable.PriorityQueue[JobEvent]() private var doneEvents = new mutable.TreeSet[JobEvent] {} private var (currentTime, runToTime, usedProcessors) = (0l, 0l, 0l) // state private val maxProcessors = numProcessors override def notify(j: Job) = { eventLoop.enqueue(new JobEvent(j, j.submitTime, EventType.Added)) runToTime = j.submitTime } def releaseProcessors(j:JobEvent)= { doneEvents -= j usedProcessors -= j.job.requiredProcessors } def addJob(current: JobEvent, doneJobs: mutable.Queue[Job]):JobEvent = { val enoughProcessors = (maxProcessors - usedProcessors) >= current.job.requiredProcessors if(enoughProcessors){ doneJobs.enqueue(current.job) current.job.startTime = Option(currentTime) usedProcessors += current.job.requiredProcessors var job = new JobEvent(current.job, current.job.startTime.get + current.job.runtime, EventType.Done) doneEvents += job job } else new JobEvent(current.job, current.time, EventType.Retry) } def retryJob(current: JobEvent, doneJobs: mutable.Queue[Job]):JobEvent = { val res = addJob(current, doneJobs) res.eType match { case EventType.Done => res case EventType.Retry => val nextSchedule: Long = if (doneEvents.nonEmpty) doneEvents.head.time else currentTime new JobEvent(current.job, nextSchedule , EventType.Retry) } } override def getDoneJobs: Iterable[Job] = { val currentDoneJobs = new mutable.Queue[Job]() while(eventLoop.nonEmpty && eventLoop.head.time >= runToTime) { val current = eventLoop.dequeue() currentTime = current.time current.eType match { case EventType.Added => eventLoop.enqueue(addJob(current, currentDoneJobs)) case EventType.Done => releaseProcessors(current) case EventType.Retry => eventLoop.enqueue(retryJob(current, currentDoneJobs)) } } currentDoneJobs } override def noMoreJobs() = runToTime = Long.MaxValue } class JobEvent(eventJob:Job,eventTime:Long,eventType:EventType) extends Ordered[JobEvent] { val (job, time, eType) = (eventJob, eventTime, eventType) override def compare(that: JobEvent) = { val tDiff = -time.compare(that.time) if(tDiff != 0) tDiff else if(eType == EventType.Done && that.eType != EventType.Done) 1 else if(that.eType == EventType.Done && eType != EventType.Done) -1 else -job.id.compare(that.job.id) } } trait JobScheduler { def notify(j: Job) def getDoneJobs: Iterable[Job] def noMoreJobs() }