This requires a custom source node.
1. Calculate a window of time you are looking for new files
a. Use the last time we ran the script as a lower bound
b. Set an upper bound of 30 seconds ago (just to avoid reading a file that is still being written.)
2. Lookup our directoryPath and make sure it is a valid path
3. List all of the files within the directory that were modified between our two time boundaries.
4. Add a new message with the contents of that file.
5. Update the time we last ran, so that the window will move forward the next time.
var lastRun = channelCache.getValue("lastRun");
if (lastRun === null) {
qie.info("Calculating new lastRun date");
// start the timer 24 hours ago.
lastRun = ((Math.floor(new java.util.Date().getTime() / 1000) - 1) * 1000) - (24*60*60*1000);
lastRun = new java.util.Date(lastRun);
} else {
lastRun = new java.util.Date(1 * lastRun);
}
// now
var momentAgo = new java.util.Date();
if (momentAgo !== null) {
momentAgo = (Math.floor(momentAgo.getTime() / 1000) - 30) * 1000;
momentAgo = new java.util.Date(momentAgo);
}
var archivePath = channelCache.getValue("directoryPath");
try {
if (StringUtils.isNotBlank(archivePath)) {
var archiveDirectory = new java.io.File(archivePath);
if (archiveDirectory.exists() && archiveDirectory.isDirectory()) {
var files = archiveDirectory.listFiles(new java.io.FileFilter() {
accept: function(file) {
var lastModified = file.lastModified();
return (lastRun.getTime() < lastModified && lastModified < momentAgo.getTime());
},
getDescription: function() {
return "Look for files between a window of time.";
}
});
if (files != null) {
// qie.warn('Found ' + files.length + ' files.');
for (var i = 0; i < files.length; i++) {
// read in bytes. qie.addInboundMessage('record', 'originalFilename');
qie.addInboundMessage(new java.lang.String(qie.readFile(files[i].getAbsolutePath()), "ISO-8859-1"), files[i].getName());
}
}
// Update only if there are no errors.
channelCache.setValue("lastRun", momentAgo.getTime());
} else {
qie.warn("archivePath can't be found as a directory.");
}
} else {
qie.warn("archivePath of '" + archivePath + "' is not valid.");
}
} catch (err) {
throw 'Caught error... err=' + err;
}