3
\$\begingroup\$

I'm taking a design workshop and we've received a short task:

  • To parse an input file of processor jobs.
  • To simulate scheduling of those jobs on a parallel computer (each job needs X processors)
  • The scheduling should be FCFS but it should be easy to to swap it out for a different scheduler.

I wrote the code in C# (in a few minutes) and decided this was a good opportunity to brush up on my now-two-years neglected Scala. Having done Haskell before, I feel that I'm not using Scala's capabilities. The whole code feels very stateful, and I'm sure there are things I'm complicating that could be much much simpler in idiomatic scala.

A bit about the structure: Since I'm simulating time but can't go through time since it's simulating a long time - I'm using a priority queue (on time) for my events. In order to avoid cases where I reschedule two events to the same time - I always promote unsuccessful schedules to the next done events. I also feel that the code could be quicker (takes 38 seconds on my laptop on the sample file).

I would really any improvement. To be fair this is a university assignment but asking on the internet on such sites was explicitly allowed.

import EventType.EventType import scala.collection.mutable import scala.io.Source object Main { def main(args: Array[String]) { val sw = System.currentTimeMillis() val lines = Source.fromFile( """c:\data\data.swf""").getLines() val maxProcessors = lines.find(_.contains("MaxProcs")).get.split(":\\s+")(1).toInt val data = lines.dropWhile(_.startsWith(";")).map(parseSWFJob) val scheduler = new FCFSScheduler(numProcessors = maxProcessors) data.map(scheduler.notify).flatMap(_ => scheduler.getDoneJobs).map(_.id).foreach(println) // pump scheduler.noMoreJobs() println(scheduler.getDoneJobs) // this writes to a file in the real code println("Time diff millis" + (System.currentTimeMillis() - sw)) } def parseSWFJob(str:String) = { val xs = str.trim.split("\\s+").map(_.trim) new Job(xs(0).toLong, xs(1).toLong, xs(3).toLong, None, xs(4).toLong) } } class Job(jobId:Long ,submittedTime:Long,jobRuntime:Long, jobStartTime: Option[Long], jobRequiredProcessors: Long){ val (id, submitTime, runtime, requiredProcessors) = (jobId, submittedTime, jobRuntime, jobRequiredProcessors) var startTime = jobStartTime override def toString = s"(id=$id,submit=$submitTime,run=$runtime,start=$startTime,procs=$requiredProcessors)" } object EventType extends Enumeration { type EventType = Value val Added, Retry, Done = Value } class FCFSScheduler(numProcessors:Long) extends JobScheduler { private val eventLoop = new mutable.PriorityQueue[JobEvent]() private var doneEvents = new mutable.TreeSet[JobEvent] {} private var (currentTime, runToTime, usedProcessors) = (0l, 0l, 0l) // state private val maxProcessors = numProcessors override def notify(j: Job) = { eventLoop.enqueue(new JobEvent(j, j.submitTime, EventType.Added)) runToTime = j.submitTime } def releaseProcessors(j:JobEvent)= { doneEvents -= j usedProcessors -= j.job.requiredProcessors } def addJob(current: JobEvent, doneJobs: mutable.Queue[Job]):JobEvent = { val enoughProcessors = (maxProcessors - usedProcessors) >= current.job.requiredProcessors if(enoughProcessors){ doneJobs.enqueue(current.job) current.job.startTime = Option(currentTime) usedProcessors += current.job.requiredProcessors var job = new JobEvent(current.job, current.job.startTime.get + current.job.runtime, EventType.Done) doneEvents += job job } else new JobEvent(current.job, current.time, EventType.Retry) } def retryJob(current: JobEvent, doneJobs: mutable.Queue[Job]):JobEvent = { val res = addJob(current, doneJobs) res.eType match { case EventType.Done => res case EventType.Retry => val nextSchedule: Long = if (doneEvents.nonEmpty) doneEvents.head.time else currentTime new JobEvent(current.job, nextSchedule , EventType.Retry) } } override def getDoneJobs: Iterable[Job] = { val currentDoneJobs = new mutable.Queue[Job]() while(eventLoop.nonEmpty && eventLoop.head.time >= runToTime) { val current = eventLoop.dequeue() currentTime = current.time current.eType match { case EventType.Added => eventLoop.enqueue(addJob(current, currentDoneJobs)) case EventType.Done => releaseProcessors(current) case EventType.Retry => eventLoop.enqueue(retryJob(current, currentDoneJobs)) } } currentDoneJobs } override def noMoreJobs() = runToTime = Long.MaxValue } class JobEvent(eventJob:Job,eventTime:Long,eventType:EventType) extends Ordered[JobEvent] { val (job, time, eType) = (eventJob, eventTime, eventType) override def compare(that: JobEvent) = { val tDiff = -time.compare(that.time) if(tDiff != 0) tDiff else if(eType == EventType.Done && that.eType != EventType.Done) 1 else if(that.eType == EventType.Done && eType != EventType.Done) -1 else -job.id.compare(that.job.id) } } trait JobScheduler { def notify(j: Job) def getDoneJobs: Iterable[Job] def noMoreJobs() } 
\$\endgroup\$
1
  • \$\begingroup\$ I already see some basic improvements (for example - the while loop) but I'm more interested in converting it to a more functional style \$\endgroup\$ Commented Nov 9, 2014 at 8:51

1 Answer 1

2
\$\begingroup\$

Anything to do with time is inherently stateful but your code doesn't have to embrace statefulness quite, but Scala gives you some tools to address that.

Scala has simple construction of parameterized objects. You can save yourself some code (and some obscurity by using this. There is no reason, in class JobEvent, say, to copy eventJob into job.

One very useful state removal process you could use is to have your job dispatcher be a transformer. It takes a tuple (in a mathematical sense) of processors used, current time, jobs being processed, jobs waiting and jobs done, and transforms that it into a new tuple. What this does is allow you to use immutable data structures, and removes state from your operations.

abstract class ComputerSchedule( processorsUsed: Int, // number of processors occupied with jobs currentTime: Int, // current simulated time inProgress: List[JobEvent], // list of jobs, ordered by ending time waiting: List[Job], // list of jobs jobsDone: List[JobEvent] // list of jobs, ordered by ending time ) { def isDone: Boolean = inProgress.empty && waiting.empty def nextState: ComputerSchedule } 

You then create a FcfsScheduler sub-class that implements the nextState method.

The transformation method finds the next (set of) JobEvents that are finishing from the inProgress queue, calculates the available number of processors, goes to the waiting queue to find suitable candidate jobs, creates JobEvents for each one and addes them to the newly constructed inProgress queue, depleting the waiting queue.

class FcfsScheduler extends ComputerSchedule { def nextState: ComputerSchedule = { // find all jobs that fit the number of processors available def findNextJobs(processorsAvailable: Int, candidates: List[Job]): (List[Job], List[Job]) = candidates match { case Nil => (Nil, Nil) case job :: tail if job.requiredProcessors <= processorsAvailable => val (remainingJobs, remainingCandidates) = findNextJobs(processorsAvailable - job.requiredProcessors, tail) (job :: remainingJobs, remainingCandidates) case _ => (Nil, candidates) } // add to list by order of ending time def addActiveJob(j: JobEvent, jl: List[JobEvent]): List[JobEvent] = ??? // add list of Jobs to inprocess list def addNextJobs(j: List[Job], jl: List[JobEvent]): List[JobEvent] = ??? // and so on. } } 

Your simulator then consists of iterating scheduler.nextState until scheduler.isDone is true. Meanwhile, you have a complete history of your simulation.

\$\endgroup\$
3
  • \$\begingroup\$ Thanks! I'll spend some time processing this. How would performance look like? This processes millions of jobs. \$\endgroup\$ Commented Nov 9, 2014 at 16:39
  • \$\begingroup\$ If you have to deal with millions of jobs, you likely don't want to keep the state history. However, calculation times would be very similar. Data structure manipulation would be similar, as immutable data would only ever be copied, and that, rarely. The list operations would be relatively cheap, as you construct new lists, perhaps using the tail of an existing list. \$\endgroup\$ Commented Nov 9, 2014 at 17:50
  • \$\begingroup\$ Thanks, I'll give it a try and let you know how performance looks like. \$\endgroup\$ Commented Nov 9, 2014 at 19:30

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.