1

I have a bunch of files that I read, process and merge certain data from corresponding multiple stream into a single stream.

Is there a more elegant solution than below (having a separate counter, calling combinedStream.end() after all source streams emit end):

let combinedStream = ....; let counter = 0; filePaths.forEach(function(filePath) { counter += 1; const fileStream = fs.createReadStream(filePath); const myStream = new MyStream(fileStream); myStream.on('data', myStream.write); myStream.on('end', function() { counter -= 1; if (counter === 0) { combinedStream.end(); } }); }); return combinedStream; 

2 Answers 2

1

A cleaner approach could be the one used in that repo, even though it does nothing more than hiding your counter somewhere and let you deal with a more comfortable callbacks based model.

This way, your code will look like:

let sharedStream = ... function onEachFilename(filename, callback) { // here you can read from the stream and push the data on the shared one, // then invoke the "internal" callback on the end event } function onEndAll() { // here you can finalize and close the shared stream } forEach(filenames, onEachFilename, onEndAll); 

Keep in mind that somewhere there is still a function that is in charge to count for you and invoke the onEnd function once all the callback functions have been invoked.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks. I upvoted it as an alternative solution, but not the answer.
0

You can just process files with a Transform stream then pipe to a PassThrough Stream.

Since, you are using let, I guess you can use ES2015.

 "use strict"; let fs=require('fs'); let filePaths=['./tmp/h.txt','./tmp/s.txt']; let Stream = require('stream'); class StreamProcessor { constructor() { this.process_streams = []; } push (source_stream) { // Create a new Transform Stream let transform = new StreamTransformer(); // Register the finish event and pipe transform.processed = transform.wait.call(transform); source_stream.pipe(transform); // push the stream to the internal array this.process_streams.push(transform); } done (callback) { let streams = this.process_streams; // Wait for all Transform streams to finish processing Promise.all( streams.map(function(s) {return s.processed; }) ) .then ( function() { let combined_stream=new Stream.PassThrough(); streams.forEach(function (stream) { stream.pipe(combined_stream); }); // Call the callback with stream callback(null,combined_stream); }) .catch(function (err) { callback(err); }); } } class StreamTransformer extends Stream.Transform { constructor () { // call super super(); } _transform(chunk,enc, transformed) { // process files here let data=chunk.toString(); data=data.substring(0,data.length-2); this.push(data); transformed(); } _flush(flushed) { // for additonal at end this.push('\n'); flushed(); } wait() { // returns a promise that resolves, when all the data is processed; let stream = this; return new Promise(function(resolve,reject) { stream.on('finish', function() { resolve(true); }); stream.on('error', function(err) { reject(err); }); }); } } /// Now you can do.. let process_stream = new StreamProcessor(); filePaths.forEach(function (fpath) { let fstream = fs.createReadStream(fpath); process_stream.push(fstream); }); process_stream.done( function (err,combined_stream) { // Consume the combines stream combined_stream.pipe(process.stdout); }); 

Test files contains 'hello' and 'stream'

 // Outputs is // hell // stream 

This can be improved further.. . :/

8 Comments

Absolutely, by piping all the streams to the combined one, the end event will be invoked multiple times on the latter!! I guess it would be tricky for the component which is demanded to read from that stream...
@cswl As I said, I'm not piping whole files to the combined stream. Only some data from the files is written to the combined streams myStream.on('data', myStream.write), so I need to know when to close the combined stream.
@cswl thanks, but it looks more complicated, not simpler
@kyrylkov. You can just put the classes on a seperate file and load it as a module. Then your whole code becomes after the // Now you can do
@cswl Thanks. I upvoted it as an alternative solution, but not the answer.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.