Sidebar

How can I read a 30GB CSV file into QIE

0 votes
465 views
asked Feb 16, 2018 by gary-t-8719 (14,860 points)
I need to process a large csv file in QIE. How can I read into the source node smaller chunks of the file and process the data without running out of memory?

1 Answer

0 votes

A 30GB file would need to be read line by line and grouped, then added as several messages into QIE.  It is not feasible to read in 30GB of data into memory.  So, here is a published function for reading a file, grouping records, and then adding them to QIE as messages:

function readFileDataToMessages(filepath, repeatHeader) {
   var file = java.io.File(filepath);
   var uuidFileName = qie.getUUID(true);
   var fileReader = new java.io.FileReader(filepath);
   var bufferedReader = new java.io.BufferedReader(fileReader);

   var line;
   var counter = 1;
   var idealMessageSize = 1024*1024;
   var stringBuffer = new java.lang.StringBuffer(idealMessageSize);
   var header = new java.lang.String('');
   
   if (repeatHeader) {
      header = bufferedReader.readLine() + '\r\n';
   }
   
   var messageBody = new java.lang.StringBuilder();
   messageBody.append(header);
   
   while ((line = bufferedReader.readLine()) !== null) {
      
      if (messageBody.length() + line.length() > idealMessageSize) {
         qie.debug("Writing file to inbound..." + filepath);
         qie.addInboundMessage(messageBody.toString(), uuidFileName);
         counter++;
         messageBody = new java.lang.StringBuilder(idealMessageSize);
         messageBody.append(header).append('\r\n');
      }
      
      messageBody.append(line).append('\r\n');
   }
   
   if (messageBody.length() > header.length()) {
      qie.debug("Writing file to inbound..." + filepath);
      qie.addInboundMessage(messageBody.toString(), uuidFileName);
   }
   
   bufferedReader.close();
   fileReader.close();
}

In your source node, instead of using a traditional file recevier, which reads the entire file into memory, you will need to find the fileHandle, and pass that to your function.  Here is what your source script configuration would look like:

if ((qie.getInboundCount() + qie.getProcessingCount()) === 0) {
   var sourceDirectory = channelCache.getValue("sourceDirectory");
   qie.debug("looking in sourceDirectory: " + sourceDirectory);
   var dir = new java.io.File(sourceDirectory);
   var files = dir.listFiles();
   if (files !== null && files.length > 0) {
      for (var currentFile = 0; currentFile < files.length; currentFile++) {
         var file = files[currentFile];
         if (!file.isDirectory()) {
            // make sure this file is older than 120 seconds
            var diff = (new java.util.Date().getTime()) - file.lastModified();
            if (diff > (120 * 1000)) {
               // we found a file, check the DB to make sure that we are not currently processing this file
               qie.debug("Found file: " + file.getPath());
               readFileDataToMessages(file.getPath(), false);
               var archiveFile = new java.io.File(channelCache.getValue("archiveDirectory") + file.getName());
               
               qie.pause(5000);
               qie.debug("renaming " + file.getPath() + " to " + archiveFile.getPath());
               
               java.nio.file.Files.move(java.nio.file.Paths.get(file.getPath()), java.nio.file.Paths.get(archiveFile.getPath()), java.nio.file.StandardCopyOption.REPLACE_EXISTING);
            }
         }
      }
   }
}

You can set the channel cache values to your sourceDirectory and archiveDirectory.

Here is a link to the configuration I created to test this concept.

 

answered Feb 16, 2018 by mike-r-7535 (13,830 points)
edited Feb 16, 2018 by gary-t-8719
...