1

I am trying to chunk a text file (lets say, a log file), to only pick a certain no. of rows at a time for processing (lets say, we are splitting log file into smaller ones). I wrote this code in imperative style:

package utils; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.function.Consumer; public class FileUtils { public static void main(String[] args) { readFileInChunks("D:\\demo.txt", 10000, System.out::println); } public static void readFileInChunks(String filePath, int chunkSize, Consumer<StringBuilder> processor) { try (BufferedReader br = new BufferedReader(new FileReader(filePath))) { StringBuilder lines = new StringBuilder(); String line, firstLine = null; int i; for (i = 0; (line = br.readLine()) != null; i++) { if (firstLine == null) firstLine = line; lines.append(line + "\n"); if ((i + 1) % chunkSize == 0) { processor.accept(lines); lines = new StringBuilder(firstLine + "\n"); } } if (lines.toString() != "") { processor.accept(lines); } br.close(); } catch (IOException e) { e.printStackTrace(); } } } 

All these years, I spent coding in iterative style and I can't come up with Java 8 streams based functional style implementation of this method.

Is it possible to make readFileInChunks method return a Stream<String> of chunks? Or, implement readFileInChunks in a functional way?

1
  • 2
    You don’t need to call br.close() manually when you declare br within the try(…) statement. That’s the whole purpose of the language feature. Commented Nov 18, 2016 at 13:56

4 Answers 4

2

First, pick the right tool for the job. If you want to process a text file in chunks, it’s much simpler to read the file in chunks, instead of reading it in lines, just to (re-)assemble the lines later on. If you want to have the chunks clipped to line boundary, it’s still simpler to search for the line break closest to the chunk boundary, instead of processing all line breaks.

public static void readFileInChunks( String filePath, int chunkSize, Consumer<? super CharSequence> processor) { CharBuffer buf=CharBuffer.allocate(chunkSize); try(FileReader r = new FileReader(filePath)) { readMore: for(;;) { while(buf.hasRemaining()) if(r.read(buf)<0) break readMore; buf.flip(); int oldLimit=buf.limit(); for(int p=oldLimit-1; p>0; p--) if(buf.charAt(p)=='\n' || buf.charAt(p)=='\r') { buf.limit(p+1); break; } processor.accept(buf); buf.position(buf.limit()).limit(oldLimit); buf.compact(); } if(buf.position()>0) { buf.flip(); processor.accept(buf); } } catch (IOException e) { e.printStackTrace(); } } 

This code might look more complicate at the first glance, but it is copying free. If you want to allow the consumer to keep a reference to the received object or perform concurrent processing, just change the lines processor.accept(buf); to processor.accept(buf.toString()); so it doesn’t pass the actual buffer to the consumer. This is mandatory, if you want to provide the same functionality as stream. For a stream, the loop has to be converted to a function which can provide the next item on request:

public static Stream<String> fileInChunks( String filePath, int chunkSize) throws IOException { FileChannel ch=FileChannel.open(Paths.get(filePath), StandardOpenOption.READ); CharsetDecoder dec = Charset.defaultCharset().newDecoder(); long size = (long)(ch.size()*dec.averageCharsPerByte()); Reader r = Channels.newReader(ch, dec, chunkSize); return StreamSupport.stream(new Spliterators.AbstractSpliterator<String>( (size+chunkSize-1)/chunkSize, Spliterator.ORDERED|Spliterator.NONNULL) { CharBuffer buf=CharBuffer.allocate(chunkSize); public boolean tryAdvance(Consumer<? super String> processor) { CharBuffer buf=this.buf; if(buf==null) return false; boolean more=true; while(buf.hasRemaining() && more) try { if(r.read(buf)<0) more=false; } catch(IOException ex) { throw new UncheckedIOException(ex); } if(more) { buf.flip(); int oldLimit=buf.limit(); for(int p=oldLimit-1; p>0; p--) if(buf.charAt(p)=='\n' || buf.charAt(p)=='\r') { buf.limit(p+1); break; } processor.accept(buf.toString()); buf.position(buf.limit()).limit(oldLimit); buf.compact(); return true; } this.buf=null; if(buf.position()>0) { buf.flip(); processor.accept(buf.toString()); return true; } return false; } }, false); } 
Sign up to request clarification or add additional context in comments.

Comments

1

You can define a custom iterator and construct a stream based on it:

public static Stream<String> readFileInChunks(String filePath, int chunkSize) throws IOException { BufferedReader br = new BufferedReader(new FileReader(filePath)); Iterator<String> iter = new Iterator<String>() { String nextChunk = null; @Override public boolean hasNext() { StringBuilder sb = new StringBuilder(); for (int i = 0; i < chunkSize; i++) { try { String nextLine = br.readLine(); if (nextLine == null) break; sb.append(nextLine).append("\n"); } catch (IOException e) { throw new UncheckedIOException(e); } } if (sb.length() == 0) { nextChunk = null; return false; } else { nextChunk = sb.toString(); return true; } } @Override public String next() { if (nextChunk != null || hasNext()) { String chunk = nextChunk; nextChunk = null; return chunk; } else { throw new NoSuchElementException(); } } }; return StreamSupport.stream(Spliterators.spliteratorUnknownSize( iter, Spliterator.ORDERED | Spliterator.NONNULL), false) .onClose(() -> { try { br.close(); } catch (IOException e) { throw new UncheckedIOException(e); } }); } 

Another option is to use the protonpack library, which offers the zipWithIndex method:

public static Stream<String> readFileInChunks(String filePath, int chunkSize) throws IOException { return new TreeMap<>(StreamUtils.zipWithIndex(Files.lines(Paths.get(filePath))) .collect(Collectors.groupingBy(el -> el.getIndex() / chunkSize))) .values().stream() .map(list -> list.stream() .map(el -> el.getValue()) .collect(Collectors.joining("\n"))); } 

The second solution is more compact, but it collects all lines in a map while grouping them (and then copies them into a TreeMap, in order to have the chunks in the right order), therefore is not suited for processing very large files.

Comments

1

i have created and tested a Solution using Java 8 which is below :

 package com.grs.stackOverFlow.pack01; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; import java.util.Optional; import java.util.function.Consumer; public class FileUtils { private static long processed=1; public static void main(String[] args) throws IOException { readFileInChunks("src/com/grs/stackOverFlow/pack01/demo.txt", 3, System.out::println); } public static void readFileInChunks(String filePath, int chunkSize, Consumer<StringBuilder> processor) throws IOException { List<String> lines = Files.readAllLines(Paths.get(filePath)); String firstLine=lines.get(0); long splitCount=lines.size()<chunkSize?1:lines.size()/chunkSize; for(int i=1;i<=splitCount;i++){ Optional<String> result=lines.stream() .skip(processed) .limit(chunkSize) .reduce((a,b) -> {processed++; return a+ "\n"+ b;}); //reduce increments processed one less time as it starts with 2 element at a time processed++; processor.accept(new StringBuilder("chunk no. = " + i + "\n" + firstLine+ "\n"+ result.orElse("") )); } } } 

3 Comments

You are reading all lines of the file, so you're not chunking.
@Mark it depends on what Processor you pass to method.OP has used same logic so i used same.
@grsdev7, actually its giving me OutOfMemoryError. Anyways, Thanks for answering.
0

One things you could do is have a custom collector that builds these chunks and then sends them to the consumer, like this for example (not compiled, just a sample):

 private static final class ToChunksCollector<T> implements Collector<T, List<StringBuilder>, List<StringBuilder>> { private final int chunkSize; public ToChunksCollector(int chunkSize) { this.chunkSize = chunkSize; } @Override public Supplier<List<StringBuilder>> supplier() { return ArrayList::new; } @Override public BiConsumer<List<StringBuilder>, T> accumulator() { return (list, line) -> { if (list.size() == 0) { list.add(new StringBuilder()); } StringBuilder lastBuilder = list.get(list.size() - 1); String[] linesInCurrentBuilder = lastBuilder.toString().split("\n"); // no more room if (linesInCurrentBuilder.length == chunkSize) { String lastLine = linesInCurrentBuilder[chunkSize - 1]; StringBuilder builder = new StringBuilder(); builder.append(lastLine).append("\n"); list.add(builder); } else { lastBuilder.append(line).append("\n"); } }; } @Override public BinaryOperator<List<StringBuilder>> combiner() { return (list1, list2) -> { list1.addAll(list2); return list1; }; } @Override public Function<List<StringBuilder>, List<StringBuilder>> finisher() { return Function.identity(); } // TODO add the relevant characterics @Override public Set<java.util.stream.Collector.Characteristics> characteristics() { return EnumSet.noneOf(Characteristics.class); } } 

And then usage:

public static void readFileInChunks(String filePath, int chunkSize, Consumer<StringBuilder> processor) { try (BufferedReader br = new BufferedReader(new FileReader(filePath))) { List<StringBuilder> builder = br.lines().collect(new ToChunksCollector<>(chunkSize)); builder.stream().forEachOrdered(processor); } catch (IOException e) { e.printStackTrace(); } } 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.