While Python has been my go-to language for most of my data import projects, I have been drawn more towards working with Typescript for data import jobs. It is the combination of Typescript types, the ES6-introduced async / await combo, and the non-blocking nature of Nodejs, which give Typescript an edge over Python.

Typescript + NodeJS for data import

At Locai, importing product data, such as availability and pricing, is a core part of our software stack. Daily, we process and index thousands of products, price updates, and availabilities.  We will be building out open source solutions for any tooling that can make data import in Typescript easier and more productive.

The first is https://www.npmjs.com/package/concu: a tiny Typescript wrapper for concurrent processing of arrays.

Processing an array of data

There are many scenarios during data import where you will need to process an array of data, where each element can be handled independently. This data can come from a text file (e.g., csv, tsv) dropped on your ftp server (still the most frequent option), or an api call listing the complete catalog.

A classic example is data received by a website crawler that processes an array of pages. Your script will have to reach out to a server and fetch the page. In this scenario, you need to make at least one http GET call to fetch the data (I/O, non-blocking), and spend some time parsing (CPU intensive).

Two naive strategies come to mind:

1) process each element sequentially: given that in Node I/O operations are typically asynchronous (meaning you immediate get a Promise back), you have to await the request for every fetch. This ends up in a very controlled execution which will likely not create any bottlenecks.

// For this example we use the node-fetch module (and @types/node-fetch)
import fetch, { Response } from "node-fetch";

// Very long list of pages to crawl
const pages: string[] = ['page1', 'page2', ..., 'pageN'];

// Sequentially go through the list of pages and return the results
const crawlPages = async (pages: string[]): Promise<Array> {
    const results: boolean[] = []; 
    for (const page of pages) {
        try {
            // key is that we wait for every fetch operation
            const result: Response = await fetch(page)
            results.push(await processPage(page));
        } catch (e) {
            console.log(e);
        }
    }
	return results;
}

/**
 * Dummy function that will take care of processing the response.
 * This may result in some cpu intensive work or storing elsewhere
 */
const processPage = async (page: Response): Promise<boolean> => {
    // ... do stuff with the page data, return a processing status (true or false)
    return status;
}

// Entry point of the crawler
crawlPages(pages).then( (results) => {
    console.log('done')
});

The above snippet will fetch and process one page at a time, and block the script execution during each fetch. There may be scenarios where this is what we want, but often this is very inefficient. Your CPU usage will be very low.

2) process all elements in parallel:

This means mainly letting NodeJS do what it does ... run things asynchronously, and not wait for the previous requests to finish:

import fetch, { Response } from "node-fetch";
const pages: string[] = ['page1', 'page2', ..., 'pageN'];

// Sequentially go through all pages and return a Promise
const crawlPages = async (pages: string[]): Promise<Array> {
    const results: Array<Promise<boolean>> = []; 
    for (const page of pages) {
        try {
            // let's put the fetch Promise into our array
            const response: Promise<Response> = fetch(page);
            const result: Promise<boolean> = 
                	response.then((page: Response) => {
                				processPage(page)
            		});		
            results.push(result)
            }
        } catch (e) {
            console.log(e);
        }
    }
	// Combine all the Promises and return when everything is done
	Promise.All(results);
}

const processPage = async (page: Response): Promise<boolean> => {
    // ... do stuff, return a processing status (true or false)
    return status;
}

crawlPages(pages)
    .then( (results) => console.log('done'));

Compared to the sequential approach, this script is the other way around. It will quickly fire off requests for all pages, return some Promises that eventually will be resolved and aggregated by Promise.All ...  this can create serious bottlenecks: CPU on the host running the script, network congestion, and downstream APIs not able to keep up.

Your application will run like this:

Enter CONCU

Besides the two above 'naive' options, there is  now a third one using CONCU:

Using concu, you are in complete control over how to process your array: in how many pieces do you want to cut your array (chunkSize), and how many chunks do you want to run concurrently ( concurrency). Finding a good combination between these two parameters will be based on possible bottlenecks: CPU on the machine running the script, networking, possible downstream database or API bottlenecks, etc.

The crawling example

import fetch, { Response } from "node-fetch";
const pages: string[] = ['page1', 'page2', ..., 'pageN'];

const concurrency = 10;
const chunkSize = 50


/**
 * This is exactly the same function of example 1.. We run this 
 * sequentially, however we can start multiple of these methods   
 * concurrently!
 */
const crawlPages = async (pages: string[]): Promise<Array> {
    const results = []; 
    for (const page of pages) {
        try {
            // key is that we wait for every fetch operation
            const result: Response = await fetch(page)
            results.push(await processPage(page));
        } catch (e) {
            console.log(e);
        }
    }
}

const processPage = async (page: Response): Promise<boolean> => {
    // ... do stuff, return a processing status (true or false)
    return status;
}

// pass in your function, set parameters and data. You will get an array of results for each function call.
const results = concu(crawlPages, concurrency, chunkSize, pages).then((results) => console.log('done'));

This is the exact hybrid scenario where we can can choose precisely how to handle requests, depending on what you think the bottleneck will be.

If you still just want to do everything sequentially:

const concurrency = 1;
const chunkSize = 1; 

or:

const concurrency = 1;
const chunkSize = pages.length; 

Or just everything in parallel:

const concurrency = page.length;
const chunkSize = 1;

Conclusion

This is just a little tool to easily experiment the trade offs between concurrency and possible bottlenecks such as: cpu (in case you do processing), downstream APIs, or machine limitations. If you have any questions or see issues with the library, please open an issue in the issue queue.