Sidebar

How can I process new files in a directory but not remove them?

0 votes
455 views
asked Nov 17, 2015 by mike-r-7535 (13,830 points)
A traditional source node gives me the option to delete the file, or move the file to another location.  I want to read new files and use it as a message, but keep the file where it is without processing duplicates.

1 Answer

0 votes

This requires a custom source node.

Below is a script that does the following:

1. Calculate a window of time you are looking for new files
   a. Use the last time we ran the script as a lower bound
   b. Set an upper bound of 30 seconds ago (just to avoid reading a file that is still being written.)
2. Lookup our directoryPath and make sure it is a valid path
3. List all of the files within the directory that were modified between our two time boundaries.
4. Add a new message with the contents of that file.
5. Update the time we last ran, so that the window will move forward the next time.

 

var lastRun = channelCache.getValue("lastRun");
if (lastRun === null) {
   qie.info("Calculating new lastRun date");
   // start the timer 24 hours ago.
   lastRun = ((Math.floor(new java.util.Date().getTime() / 1000) - 1) * 1000) - (24*60*60*1000);
   lastRun = new java.util.Date(lastRun);
} else {
   lastRun = new java.util.Date(1 * lastRun);
}
 
// now
var momentAgo = new java.util.Date();
if (momentAgo !== null) {
   momentAgo = (Math.floor(momentAgo.getTime() / 1000) - 30) * 1000;
   momentAgo = new java.util.Date(momentAgo);
}
 
var archivePath = channelCache.getValue("directoryPath");
try {
   if (StringUtils.isNotBlank(archivePath)) {
      var archiveDirectory = new java.io.File(archivePath);
      if (archiveDirectory.exists() && archiveDirectory.isDirectory()) {
         var files = archiveDirectory.listFiles(new java.io.FileFilter() {
            accept: function(file) {
               var lastModified = file.lastModified();
               return (lastRun.getTime() < lastModified && lastModified < momentAgo.getTime());
            },
            getDescription: function() {
               return "Look for files between a window of time.";
            }
         });
         
         if (files != null) {
            // qie.warn('Found ' + files.length + ' files.');
            for (var i = 0; i < files.length; i++) {
               // read in bytes. qie.addInboundMessage('record', 'originalFilename');
               qie.addInboundMessage(new java.lang.String(qie.readFile(files[i].getAbsolutePath()), "ISO-8859-1"), files[i].getName());
            }
         }
         // Update only if there are no errors.
         channelCache.setValue("lastRun", momentAgo.getTime());
      } else {
         qie.warn("archivePath can't be found as a directory.");
      }
   } else {
      qie.warn("archivePath of '" + archivePath + "' is not valid.");
   }
} catch (err) {
   throw 'Caught error... err=' + err;
}
answered Nov 17, 2015 by mike-r-7535 (13,830 points)
...