4

I want to run 1 thundered http requests in configurable chunks, and set configurable timeout between chunk requests. The request is based on the data provided with some.csv file.

It doesn't work because I am getting a TypeError, but when I remove () after f, it doesn't work either. I would be very grateful for a little help. Probably the biggest problem is that I don't really understand how exactly promises work, but I tried multiple solutions and I wasn't able to achieve what I want.

The timeout feature will probably give me even more headache so I would appreciate any tips for this too.

Can you please help me to understand why it doesn't work?

Here is the snippet:

const rp = require('request-promise'); const fs = require('fs'); const { chunk } = require('lodash'); const BATCH_SIZE = 2; const QUERY_PARAMS = ['clientId', 'time', 'changeTime', 'newValue']; async function update(id, time, query) { const options = { method: 'POST', uri: `https://requesturl/${id}?query=${query}`, body: { "prop": { "time": time } }, headers: { "Content-Type": "application/json" }, json: true } return async () => { return await rp(options) }; } async function batchRequestRunner(data) { const promises = []; for (row of data) { row = row.split(','); promises.push(update(row[0], row[1], QUERY_PARAMS.join(','))); } const batches = chunk(promises, BATCH_SIZE); for (let batch of batches) { try { Promise.all( batch.map(async f => { return await f();}) ).then((resp) => console.log(resp)); } catch (e) { console.log(e); } } } async function main() { const input = fs.readFileSync('./input.test.csv').toString().split("\n"); const requestData = input.slice(1); await batchRequestRunner(requestData); } main(); 

Clarification for the first comment:

I have a csv file which looks like below:

clientId,startTime 123,13:40:00 321,13:50:00 

the file size is ~100k rows the file contains information how to update time for a particular clientId in the database. I don't have an access to the database but I have access to an API which allows to update entries in the database. I cannot make 100k calls at once, because: my network is limited (I work remotely because of coronavirus), it comsumpts a lot of memory, and API can also be limited and can crash if I will make all the requests at once.

What I want to achieve:

  • Load csv into memory, convert it to an Array

  • Handle api requests in chunks, for example take first two rows from the array, make API call based on the first two rows, wait 1000ms, take another two rows, and continue processing until the end of array (csv file)

6
  • Can you back up a few levels and describe (in words) what you're trying to accomplish (like a mini spec for what you want to do)? This looks overly complicated to me so I suspect we can suggest an easier path if we understand the overall objective. Commented Mar 20, 2020 at 23:43
  • hey, I updated the objective bellow the code snippet Commented Mar 21, 2020 at 0:10
  • 1
    Why are you using the same rowValues[0] in both of these const newRowValues = [rowValues[0], rowValues[0]].join(',');? Commented Mar 21, 2020 at 0:17
  • Why does your function update(id, time, query) accept three arguments, but you only pass it two here promises.push(update(row[0], row[1]));? I'm working on a new solution, but I keep finding these types of coding errors in your example. Commented Mar 21, 2020 at 0:23
  • There are a lot of more processing preparation before I put it to the request. I tried to remove unnecessary code before sending it here but there are still some leftovers. I am sorry for confusion, I updated the code above, right now it just removes the headline from the csv. Commented Mar 21, 2020 at 0:24

2 Answers 2

4

Well, it seems like this is a somewhat classic case of where you want to process an array of values with some asynchronous operation and to avoid consuming too many resources or overwhelming the target server, you want to have no more than N requests in-flight at the same time. This is a common problem for which there are pre-built solutions for. My goto solution is a small piece of code called mapConcurrent(). It's analagous to array.map(), but it assumes a promise-returning asynchronous callback and you pass it the max number of items that should ever be in-flight at the same time. It then returns to you a promise that resolves to an array of results.

Here's mapConcurrent():

// takes an array of items and a function that returns a promise // returns a promise that resolves to an array of results function mapConcurrent(items, maxConcurrent, fn) { let index = 0; let inFlightCntr = 0; let doneCntr = 0; let results = new Array(items.length); let stop = false; return new Promise(function(resolve, reject) { function runNext() { let i = index; ++inFlightCntr; fn(items[index], index++).then(function(val) { ++doneCntr; --inFlightCntr; results[i] = val; run(); }, function(err) { // set flag so we don't launch any more requests stop = true; reject(err); }); } function run() { // launch as many as we're allowed to while (!stop && inflightCntr < maxConcurrent && index < items.length) { runNext(); } // if all are done, then resolve parent promise with results if (doneCntr === items.length) { resolve(results); } } run(); }); } 

Your code can then be structured to use it like this:

function update(id, time, query) { const options = { method: 'POST', uri: `https://requesturl/${id}?query=${query}`, body: { "prop": { "time": time } }, headers: { "Content-Type": "application/json" }, json: true } return rp(options); } function processRow(row) { let rowData = row.split(","); return update(rowData[0], rowData[1], rowData[2]); } function main() { const input = fs.readFileSync('./input.test.csv').toString().split("\n"); const requestData = input.slice(1); // process this entire array with up to 5 requests "in-flight" at the same time mapConcurrent(requestData, 5, processRow).then(results => { console.log(results); }).catch(err => { console.log(err); }); } 

You can obviously adjust the number of concurrent requests to whatever number you want. I set it to 5 here in this example.

Sign up to request clarification or add additional context in comments.

1 Comment

@user3652705 - FYI, here are links to several different implementations like mapConcurrent().
1
 import { chunk } from "lodash"; interface ConcurrentResult<T> { success: T[]; errors: { error: Error; item: string; }[]; total: number; completed: number; } export async function mapConcurrent<T>( items: string[], concurrency: number, fn: (item: string) => Promise<T>, onProgress?: (progress: number) => void ): Promise<ConcurrentResult<T>> { const result: ConcurrentResult<T> = { success: [], errors: [], total: items.length, completed: 0, }; const chunks = chunk(items, concurrency); for (const chunk of chunks) { const promises = chunk.map(async (item) => { try { const response = await fn(item); result.success.push(response); } catch (error) { result.errors.push({ error: error as Error, item }); } finally { result.completed++; onProgress?.(Math.floor((result.completed / result.total) * 100)); } }); await Promise.allSettled(promises); } return result; } 

it's good if you want handle on typescript

1 Comment

As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.