Moogsoft Docs

Kafka

The Kafka module allows you to broadcast information on a Kafka bus. For example, you can use it to push alert or Situation data to a data lake via Kafka.

Configure the Module

To use the Kafka Moobot module:

  1. Load the Moobot module.

  2. Use the connect method to create a new connection to one or more Kafka brokers.

  3. Use the send method to send the required information.

  4. Use the close method to close the connection.

Refer to the Examples for more details.

Reference Guide

You can use the following methods in the Kafka Moobot module.

connect

Establishes a connection to one or more Kafka brokers with defined connection properties.

Request Argument

Name

Type

Description

<properties>

Object

A JavaScript object containing connection properties. See below.

Kafka Connection Properties

The Kafka module connect method defines connection properties as a Javascript object, which may include the following keys:

Name

Type

Description

Required

servers

List

Specifies the Kafka broker(s) to connect to.

connection_id

String

Name of the connection.

use_ssl

Boolean

Whether to connect using SSL.

Optional

ssl_truststore_location

String

Path to the SSL truststore file.

ssl_truststore_password

String

Password for the SSL truststore file.

ssl_truststore_encrypted_password

String

Encrypted password for the SSL truststore file.

ssl_key_password

String

Password for the SSL certificate.

ssl_key_encrypted_password

String

Encrypted password for the SSL certificate.

ssl_endpoint_identification_algorithm

String

Endpoint identification algorithm to validate server host name.

compression_codec

String

Compression codec to use. Choose from none, gzip, lz4 and snappy.

use_sasl

Boolean

Whether to connect using SASL.

sasl_mechanism

String

SASL mechanism to use. Choose from PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 and OAUTHBEARER.

security_protocol

String

SASL protocol to use. Choose from SASL_SSL and SASL_PLAINTEXT. To use SASL_SSL, you must set use_ssl to true.

sasl_jaas_config

String

Credentials for JAAS authentication.

additional_properties

Varies

Kafka consumer properties. Any properties you define here take priority over any configurations. See the Apache Kafka documentation for descriptions of these properties.

sasl_kerberos_service_name

String

Kerberos service name.

kerberos_conf_file_path

String

Path to the Kerberos configuration file.

kerberos_debug_log

Boolean

Whether to enable Kerberos authentication debug logs.

Return Parameter

Type

Description

Object

A Java object containing connection details, depending on the requested connection properties.

Returns null if no connection can be made.

Example

Without SSL:

  var conn = kafka.connect({
     servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"],
     connection_id: "MoogKafkaConnection",
     use_ssl: false});

Using SSL:

   var conn = kafka.connect({
     servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"],
     connection_id: "MoogKafkaConnection",
     use_ssl: true,
     ssl_truststore_location: "/path/to/truststore",
     ssl_truststore_password: "test1234",
     ssl_keystore_location: "/path/to/keystore",
     ssl_keystore_password: "test1234",
     ssl_key_password: "test1234",
     ssl_endpoint_identification_algorithm: "",
     use_sasl: false
   });

Using SASL:

  var conn = kafka.connect({
     servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"],
     connection_id: "MoogKafkaConnection",
     use_ssl: false,
     use_sasl: true,
     sasl_mechanism: "PLAIN",
     security_protocol: "SASL_PLAINTEXT",
     sasl_jaas_config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=alice password=alice-secret;",
     additional_properties: {
       "sasl.login.refresh.window.factor": "0.8",
       "sasl.login.refresh.window.jitter": "0.05",
       "sasl.login.refresh.min.period.seconds": "60",
       "sasl.login.refresh.min.buffer.seconds": "300"
     },
     sasl_kerberos_service_name: "kafka",
     kerberos_conf_file_path : "/etc/krb5.conf",  
     kerberos_debug_log : true
   });

connection.send

Sends a message to the Kafka broker.

Name

Type

Required

Description

topic

String

Yes

The Kafka topic to send the message on to.

key

String

No

The key to use for the message.

message

JSON

Yes

Message in one of the following formats:

  • Plain text

  • JSON Object payload

  • JSON Array payload

Return Parameter

Type

Description

Boolean

Indicates whether the send operation was successful.

Example

if (connection) {
   connection.send("myTopic", {test_field: "value"});
   connection.send("myTopic", "myKey", {test_field: "value"});
}

close

Closes the connection to the Kafka broker.

Request Arguments

None.

Return Parameter

None.

Examples

The following examples demonstrate the use of the Kafka Moobot module:

var kafka = MooBot.loadModule('Kafka');

function sendToKafka(alert)
{
    var connection = kafka.connect({
        servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"],
         connection_id: "MoogKafkaConnection",
         use_ssl: false
    });

    if (connection) {
        boolean success = connection.send("myTopic", {test_field: "value"});
         if (!success) {
         logger.warning("Failed to send message to Kafka");
       }
    }
}