You can just process files with a Transform stream then pipe to a PassThrough Stream.
Since, you are using let, I guess you can use ES2015.
"use strict"; let fs=require('fs'); let filePaths=['./tmp/h.txt','./tmp/s.txt']; let Stream = require('stream'); class StreamProcessor { constructor() { this.process_streams = []; } push (source_stream) { // Create a new Transform Stream let transform = new StreamTransformer(); // Register the finish event and pipe transform.processed = transform.wait.call(transform); source_stream.pipe(transform); // push the stream to the internal array this.process_streams.push(transform); } done (callback) { let streams = this.process_streams; // Wait for all Transform streams to finish processing Promise.all( streams.map(function(s) {return s.processed; }) ) .then ( function() { let combined_stream=new Stream.PassThrough(); streams.forEach(function (stream) { stream.pipe(combined_stream); }); // Call the callback with stream callback(null,combined_stream); }) .catch(function (err) { callback(err); }); } } class StreamTransformer extends Stream.Transform { constructor () { // call super super(); } _transform(chunk,enc, transformed) { // process files here let data=chunk.toString(); data=data.substring(0,data.length-2); this.push(data); transformed(); } _flush(flushed) { // for additonal at end this.push('\n'); flushed(); } wait() { // returns a promise that resolves, when all the data is processed; let stream = this; return new Promise(function(resolve,reject) { stream.on('finish', function() { resolve(true); }); stream.on('error', function(err) { reject(err); }); }); } } /// Now you can do.. let process_stream = new StreamProcessor(); filePaths.forEach(function (fpath) { let fstream = fs.createReadStream(fpath); process_stream.push(fstream); }); process_stream.done( function (err,combined_stream) { // Consume the combines stream combined_stream.pipe(process.stdout); });
Test files contains 'hello' and 'stream'
// Outputs is // hell // stream
This can be improved further.. . :/