Kafka

The Kafka module allows you to broadcast information on a Kafka bus. For example, you can use it to push alert or Situation data to a data lake via Kafka.

Load the module

The Kafka Moobot module is available to load into any standard Moobot. To load it, define a new global variable at the top of the Moobot Javascript file. For example:

var kafka = MooBot.loadModule('Kafka');

Method reference

The Kafka module uses the following methods.

connect

Establishes a connection to one or more Kafka brokers with defined connection properties.

Request arguments

The method takes the following arguments.

Name

Type

Required

Description

properties

Object

Yes

A JavaScript object containing connection properties. See Connection properties below.

Connection properties

The properties object can include the following keys:

Name

Type

Required

Description

servers

List

Yes

Specifies the Kafka broker or brokers to connect to.

connection_id

String

Yes

Name of the connection.

use_ssl

Boolean

Yes

Whether to connect using SSL.

ssl_truststore_location

String

No

Path to the SSL truststore file.

ssl_truststore_password

String

No

Password for the SSL truststore file.

ssl_truststore_encrypted_password

String

No

Encrypted password for the SSL truststore file.

ssl_key_password

String

No

Password for the SSL certificate.

ssl_key_encrypted_password

String

No

Encrypted password for the SSL certificate.

ssl_endpoint_identification_algorithm

String

No

Endpoint identification algorithm to validate the server hostname.

compression_codec

String

No

Compression codec to use. One of:

  • none

  • gzip

  • lz4

  • snappy

use_sasl

Boolean

No

Whether to connect using SASL.

sasl_mechanism

String

No

SASL mechanism to use. If you set a SASL mechanism you must set use_sasl to true. One of:

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

  • OAUTHBEARER

Choose from PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 and OAUTHBEARER.

security_protocol

String

No

SASL protocol to use. If you set a SASL protocol you must set use_sasl to true. One of:

  • SASL_SSL

  • SASL_PLAINTEXT

sasl_jaas_config

String

No

Credentials for JAAS authentication.

additional_properties

Various

No

Kafka consumer properties. Any properties you define here take priority over any configurations. See the Apache Kafka documentation for descriptions of these properties.

sasl_kerberos_service_name

String

No

Kerberos service name.

kerberos_conf_file_path

String

No

Path to the Kerberos configuration file.

kerberos_debug_log

Boolean

No

Whether to enable Kerberos authentication debug logs.

Response

The method returns the following parameter.

Type

Description

Object

A Java object containing connection details, depending on the requested connection properties. Returns null if no connection can be made.

Example

Example use of the connect method without SSL:

var conn = kafka.connect(
{
    servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"],
    connection_id: "KafkaConnection",
    use_ssl: false
});

Example use of the connect method using SSL:

var conn = kafka.connect(
{
    servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"],
    connection_id: "KafkaConnection",
    use_ssl: true,
    ssl_truststore_location: "/path/to/truststore",
    ssl_truststore_password: "test1234",
    ssl_keystore_location: "/path/to/keystore",
    ssl_keystore_password: "test1234",
    ssl_key_password: "test1234",
    ssl_endpoint_identification_algorithm: "",
    use_sasl: false
});

Example use of the connect method using SASL:

var conn = kafka.connect(
{
    servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"],
    connection_id: "KafkaConnection",
    use_ssl: false,
    use_sasl: true,
    sasl_mechanism: "PLAIN",
    security_protocol: "SASL_PLAINTEXT",
    sasl_jaas_config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=myuser password=mypassword;",
    additional_properties:{
       "sasl.login.refresh.window.factor": "0.8",
       "sasl.login.refresh.window.jitter": "0.05",
       "sasl.login.refresh.min.period.seconds": "60",
       "sasl.login.refresh.min.buffer.seconds": "300"
     },
    sasl_kerberos_service_name: "kafka",
    kerberos_conf_file_path : "/etc/krb5.conf",  
    kerberos_debug_log : true
});

send

Sends a message to the Kafka broker.

Request arguments

The method takes the following arguments.

Name

Type

Required

Description

topic

String

Yes

The Kafka topic to send the message on to.

key

String

No

The key to use for the message.

message

JSON

Yes

Message in one of the following formats:

  • Plain text

  • JSON Object payload

  • JSON Array payload

Response

The method returns the following parameter:

Type

Description

Boolean

Indicates whether the send operation was successful.

Example

Example use of the send method:

if (connection) 
{
    connection.send("myTopic", {test_field: "value"});
    connection.send("myTopic", "myKey", {test_field: "value"});
}

close

Closes the connection to the Kafka broker.

Request arguments

The method takes no arguments.

Response

None.

Examples

Example function using the Kafka module:

function sendToKafka(alert)
{
    var connection = kafka.connect({
        servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"],
         connection_id: "KafkaConnection",
         use_ssl: false
    });

    if (connection){
        boolean success = connection.send("myTopic", {test_field: "value"});
         if (!success){
         logger.warning("Failed to send message to Kafka");
         }
    }
}