I have implemented a small IO class, which can read from multiple and same files on different disks (e.g two hard disks containing the same file). In sequential case, both disks read 60MB/s in average over the file, but when I do an interleaved (e.g. 4k disk 1, 4k disk 2 then combine), the effective read speed is reduced to 40MB/s instead of increasing?
Context: Win 7 + JDK 7b70, 2GB RAM, 2.2GB test file. Basically, I try to mimic Win7's ReadyBoost and RAID x in a poor man's fashion.
In the heart, when a read() is issued to the class, it creates two runnables with instructions to read a pre-opened RandomAccessFile from a certain position and length. Using an executor service and Future.get() calls, when both finish, the data read gets copied into a common buffer and returned to the caller.
Is there a conceptional error in my approach? (For example, the OS caching mechanism will always counteract?)
protected <T> List<T> waitForAll(List<Future<T>> futures) throws MultiIOException { MultiIOException mex = null; int i = 0; List<T> result = new ArrayList<T>(futures.size()); for (Future<T> f : futures) { try { result.add(f.get()); } catch (InterruptedException ex) { if (mex == null) { mex = new MultiIOException(); } mex.exceptions.add(new ExceptionPair(metrics[i].file, ex)); } catch (ExecutionException ex) { if (mex == null) { mex = new MultiIOException(); } mex.exceptions.add(new ExceptionPair(metrics[i].file, ex)); } i++; } if (mex != null) { throw mex; } return result; } public int read(long position, byte[] output, int start, int length) throws IOException { if (start < 0 || start + length > output.length) { throw new IndexOutOfBoundsException( String.format("start=%d, length=%d, output=%d", start, length, output.length)); } // compute the fragment sizes and positions int result = 0; final long[] positions = new long[metrics.length]; final int[] lengths = new int[metrics.length]; double speedSum = 0.0; double maxValue = 0.0; int maxIndex = 0; for (int i = 0; i < metrics.length; i++) { speedSum += metrics[i].readSpeed; if (metrics[i].readSpeed > maxValue) { maxValue = metrics[i].readSpeed; maxIndex = i; } } // adjust read lengths int lengthSum = length; for (int i = 0; i < metrics.length; i++) { int len = (int)Math.ceil(length * metrics[i].readSpeed / speedSum); lengths[i] = (len > lengthSum) ? lengthSum : len; lengthSum -= lengths[i]; } if (lengthSum > 0) { lengths[maxIndex] += lengthSum; } // adjust read positions long positionDelta = position; for (int i = 0; i < metrics.length; i++) { positions[i] = positionDelta; positionDelta += (long)lengths[i]; } List<Future<byte[]>> futures = new LinkedList<Future<byte[]>>(); // read in parallel for (int i = 0; i < metrics.length; i++) { final int j = i; futures.add(exec.submit(new Callable<byte[]>() { @Override public byte[] call() throws Exception { byte[] buffer = new byte[lengths[j]]; long t = System.nanoTime(); long t0 = t; long currPos = metrics[j].handle.getFilePointer(); metrics[j].handle.seek(positions[j]); t = System.nanoTime() - t; metrics[j].seekTime = t * 1024.0 * 1024.0 / Math.abs(currPos - positions[j]) / 1E9 ; int c = metrics[j].handle.read(buffer); t0 = System.nanoTime() - t0; // adjust the read speed if we read something if (c > 0) { metrics[j].readSpeed = (alpha * c * 1E9 / t0 / 1024 / 1024 + (1 - alpha) * metrics[j].readSpeed) ; } if (c < 0) { return null; } else if (c == 0) { return EMPTY_BYTE_ARRAY; } else if (c < buffer.length) { return Arrays.copyOf(buffer, c); } return buffer; } })); } List<byte[]> data = waitForAll(futures); boolean eof = true; for (byte[] b : data) { if (b != null && b.length > 0) { System.arraycopy(b, 0, output, start + result, b.length); result += b.length; eof = false; } else { break; // the rest probably reached EOF } } // if there was no data at all, we reached the end of file if (eof) { return -1; } sequentialPosition = position + (long)result; // evaluate the fastest file to read double maxSpeed = 0; maxIndex = 0; for (int i = 0; i < metrics.length; i++) { if (metrics[i].readSpeed > maxSpeed) { maxSpeed = metrics[i].readSpeed; maxIndex = i; } } fastest = metrics[maxIndex]; return result; } (FileMetrics in metrics array contain measurements of read speed to adaptively determine the buffer sizes of various input channels - in my test with alpha = 0 and readSpeed = 1 results equal distribution)
Edit I ran an non-entangled test (e.g read the two files independently in separate threads.) and I've got a combined effective speed of 110MB/s.
Edit2 I guess I know why is this happening.
When I read in parallel and in sequence, it is not a sequential read for the disks, but rather read-skip-read-skip pattern due the interleaving (and possibly riddled with allocation table lookups). This basically reduces the effective read speed per disk to half or worse.