0

I am trying to insert data coming from a remote CSV file in MongoDB (using Mongoose). I would like to do so in bulk inserts of 100 items at a time.

Here's my code:

import csv from 'csv-parser'
import fetch from 'node-fetch'
import { Product } from '../models/Product'

export async function handleCSVProcessing(targetCsv: string) {
  const batchSize = 100

  try {
    const response = await fetch(targetCsv, {
      method: 'get',
      headers: {
        'content-type': 'text/csv;charset=UTF-8',
      }
    })
    if (!response || !response.ok || !response.body) {
      throw new Error(`Failed to fetch CSV file: ${response.statusText}`)
    }

    const productsToUpsert: any[] = []

    response.body
      .pipe(csv())
      .on('data', async (row: any) => {
 
        productsToUpsert.push({
          // logic not relevant to the question here
        })

        if (productsToUpsert.length >= batchSize) {
          console.log('productsToUpsert.length: ' + productsToUpsert.length)  // this keeps growing after 100! 101, 102, 103...
          await Product.bulkWrite(productsToUpsert);
          productsToUpsert.length = 0  // theoretically this should empty the array but it doesn't
        }
      })
      .on('end', async () => {
        if (productsToUpsert.length > 0) {
          await Product.bulkWrite(productsToUpsert);
        }
      })
      .on('error', (error: any) => {
        console.error('Error processing CSV:', error);
      });
  } catch (error) {
    console.error('Error fetching CSV:', error);
  }
}

I would expect that, once 100 items have been added to productsToUpsert, those are inserted in the DB, the array gets emptied and then it starts over until it's filled with 100 items again (the CSV file has thousands of rows).

However, my console.log shows me that the length of the array continues to grow after 100.

What am I doing wrong here? I also tried switching to using let instead of const and then doing productsToUpsert = [] but the result is still the same.

Could it be that I am handling the async and await stuff incorrectly and somehow the array grows while something else is happening (IE the DB insert)? Or maybe I'm handling the stream wrong.

2
  • 1
    "I would expect that [...] those are inserted in the DB [...] and then it starts over" what is that expectation based on? Did you follow a tutorial that told you that? Did you read the docs and that's what they said? Did you check official resources to verify that that expectation was founded? Because if so, talk about that. And if not, of course, do that (ideally, "do that first" but it's too late for that ;) Commented Jul 5 at 1:07
  • @Mike'Pomax'Kamermans the expectation is based on my (admittedly limited) knowledge of NodeJS streams. What I see with the logs I put is that .pipe() and .on() allow me to read the rows in the CSV one by one, so I was assuming it was some sort of "normal" foreach loop iterating over them. If you have any suggestion about what I'm doing wrong, please go ahead and enlighten me instead of posting some snide comment that basically says "RTFM".
    – mrodo
    Commented Jul 5 at 8:26

2 Answers 2

0

Turns out using async iteration greatly simplifies the code and allows me to achieve what I needed: a simple loop over the CSV rows.

Here's the fix:

export async function handleCSVProcessing(targetCsv: string): Promise<void> {
  try {
    const response = await fetch(targetCsv, {
      method: 'get',
      headers: {
        'content-type': 'text/csv;charset=UTF-8',
      }
    })
    const csvStream = response.body.pipe(csvParser())

    for await (const row of csvStream) {    
      // do DB stuff here
    }
  } catch (err) {
    console.error('Error fetching CSV:', err)
  }
}

for await FTW!

-1

to use asynchronous functions for data processing, you need to run a stream method.

const stream = response.body

stream
 .pipe(csv())
 .on('data', async (row: any) => {
   stream.pause();
   ...your async logic
   stream.resuem();     
}
2

Not the answer you're looking for? Browse other questions tagged or ask your own question.