Kafka
The Kafka module allows you to broadcast information on a Kafka bus. For example, you can use it to push alert or Situation data to a data lake via Kafka.
Load the module
The Kafka Moobot module is available to load into any standard Moobot. To load it, define a new global variable at the top of the Moobot Javascript file. For example:
var kafka = MooBot.loadModule('Kafka');
Method reference
The Kafka module uses the following methods.
connect
Establishes a connection to one or more Kafka brokers with defined connection properties.
Request arguments
The method takes the following arguments.
Name |
Type |
Required |
Description |
---|---|---|---|
|
Object |
Yes |
A JavaScript object containing connection properties. See Connection properties below. |
Connection properties
The properties
object can include the following keys:
Name |
Type |
Required |
Description |
---|---|---|---|
|
List |
Yes |
Specifies the Kafka broker or brokers to connect to. |
|
String |
Yes |
Name of the connection. |
|
Boolean |
Yes |
Whether to connect using SSL. |
|
String |
No |
Path to the SSL truststore file. |
|
String |
No |
Password for the SSL truststore file. |
|
String |
No |
Encrypted password for the SSL truststore file. |
|
String |
No |
Password for the SSL certificate. |
|
String |
No |
Encrypted password for the SSL certificate. |
|
String |
No |
Endpoint identification algorithm to validate the server hostname. |
|
String |
No |
Compression codec to use. One of:
|
|
Boolean |
No |
Whether to connect using SASL. |
|
String |
No |
SASL mechanism to use. If you set a SASL mechanism you must set
Choose from |
|
String |
No |
SASL protocol to use. If you set a SASL protocol you must set
|
|
String |
No |
Credentials for JAAS authentication. |
|
Various |
No |
Kafka consumer properties. Any properties you define here take priority over any configurations. See the Apache Kafka documentation for descriptions of these properties. |
|
String |
No |
Kerberos service name. |
|
String |
No |
Path to the Kerberos configuration file. |
|
Boolean |
No |
Whether to enable Kerberos authentication debug logs. |
Response
The method returns the following parameter.
Type |
Description |
---|---|
Object |
A Java object containing connection details, depending on the requested connection
properties. Returns |
Example
Example use of the connect
method without SSL:
var conn = kafka.connect( { servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"], connection_id: "KafkaConnection", use_ssl: false });
Example use of the connect
method using SSL:
var conn = kafka.connect( { servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"], connection_id: "KafkaConnection", use_ssl: true, ssl_truststore_location: "/path/to/truststore", ssl_truststore_password: "test1234", ssl_keystore_location: "/path/to/keystore", ssl_keystore_password: "test1234", ssl_key_password: "test1234", ssl_endpoint_identification_algorithm: "", use_sasl: false });
Example use of the connect
method using SASL:
var conn = kafka.connect( { servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"], connection_id: "KafkaConnection", use_ssl: false, use_sasl: true, sasl_mechanism: "PLAIN", security_protocol: "SASL_PLAINTEXT", sasl_jaas_config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=myuser password=mypassword;", additional_properties:{ "sasl.login.refresh.window.factor": "0.8", "sasl.login.refresh.window.jitter": "0.05", "sasl.login.refresh.min.period.seconds": "60", "sasl.login.refresh.min.buffer.seconds": "300" }, sasl_kerberos_service_name: "kafka", kerberos_conf_file_path : "/etc/krb5.conf", kerberos_debug_log : true });
send
Sends a message to the Kafka broker.
Request arguments
The method takes the following arguments.
Name |
Type |
Required |
Description |
---|---|---|---|
|
String |
Yes |
The Kafka topic to send the message on to. |
|
String |
No |
The key to use for the message. |
|
JSON |
Yes |
Message in one of the following formats:
|
Response
The method returns the following parameter:
Type |
Description |
---|---|
Boolean |
Indicates whether the send operation was successful. |
Example
Example use of the send
method:
if (connection) { connection.send("myTopic", {test_field: "value"}); connection.send("myTopic", "myKey", {test_field: "value"}); }
close
Closes the connection to the Kafka broker.
Request arguments
The method takes no arguments.
Response
None.
Examples
Example function using the Kafka module:
function sendToKafka(alert) { var connection = kafka.connect({ servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"], connection_id: "KafkaConnection", use_ssl: false }); if (connection){ boolean success = connection.send("myTopic", {test_field: "value"}); if (!success){ logger.warning("Failed to send message to Kafka"); } } }