Sidebar

How can I add a connection to kafka - inbound and outbound?

0 votes
1.2K views
asked Dec 21, 2020 by andrei-g-4348 (180 points)

2 Answers

+1 vote
You can create a Web Service to connect to an Apache kafka end point or if you are looking to use kafka functions you can download the kafka jar from apache and load that as a Qvera Interface Engine library.
answered Dec 21, 2020 by michael-h-5027 (14,390 points)
+1 vote

There are standard applications that will allow for a system to push to or pull from a kafka queue using a standard REST web service.  If this is what you are looking for, then using their documentation for the REST calls can be used and should be pretty straight forward inside of QIE.  However, if you want to use the kafka libraries and create a consumer or producer from QIE, you will first need to download the kafka jar's from the kafka website.  You will then extract the 'lib' folder from the downloaded sources and place that in your QIE libs folder named kafka (it should have about 95 jars in that folder).  You will then link it in the external libraries found in the QIE 'System Config' page.

Once the jar's are in place and QIE has been restarted you can create a simple producer using the following script:

var properties = new java.util.Properties();
properties.put("bootstrap.servers", "10.97.10.56:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

var kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(properties);
try {
   for (var i = 0; i < 100 ; i++) {
      kafkaProducer.send(new org.apache.kafka.clients.producer.ProducerRecord("dev-test", i + "", "Test Message - " + qie.getUUID(true)));
   }
} finally {
   kafkaProducer.close();
}

You will need to modify the script with the proper queue names, and what data you want sent to the queue, but this is a good starting point.  In the above example, we just submit 100 messages to the queue.

If you want to receive messages from a kafka queu, you can use a custom script receiver as follows:

var properties = new java.util.Properties();
properties.put("bootstrap.servers", "10.97.10.56:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// properties.put("enable.auto.commit", "true");
// properties.put("auto.commit.interval.ms", "10");
properties.put("group.id", "test-group");

var kafkaConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer(properties);
var topics = new java.util.ArrayList();
topics.add("dev-test");
kafkaConsumer.subscribe(topics);

try {
   var records = kafkaConsumer.poll(java.time.Duration.ofMillis(1000));
   if (!records.isEmpty()) {
      var iterator = records.iterator();
      while (iterator.hasNext()) {
         var record = iterator.next();
         qie.addInboundMessage(record.value(), record.topic());
      }
   } else {
      qie.warn('Nothing Found');
   }
} finally {
   kafkaConsumer.close();
}

Note, the consumer polls for 1 second and if there is nothing in there, a warning log will indicate that "Nothing Found".

answered Dec 21, 2020 by ben-s-7515 (12,320 points)
...