Kafka
The Kafka module allows you to broadcast information on a Kafka bus. For example, you can use it to push alert or Situation data to a data lake via Kafka.
Load the module
The Kafka Moobot module is available to load into any standard Moobot. To load it, define a new global variable at the top of the Moobot Javascript file. For example:
var kafka = MooBot.loadModule('Kafka');
Method reference
The Kafka module uses the following methods.
connect
Establishes a connection to one or more Kafka brokers with defined connection properties.
Request arguments
The method takes the following arguments.
Name | Type | Required | Description |
---|---|---|---|
| Object | Yes | A JavaScript object containing connection properties. See Connection properties below. |
Connection properties
The properties
object can include the following keys:
Name | Type | Required | Description |
---|---|---|---|
| List | Yes | Specifies the Kafka broker or brokers to connect to. |
| String | Yes | Name of the connection. |
| Boolean | Yes | Whether to connect using SSL. |
| String | No | Path to the SSL truststore file. |
| String | No | Password for the SSL truststore file. |
| String | No | Encrypted password for the SSL truststore file. |
| String | No | Password for the SSL certificate. |
| String | No | Encrypted password for the SSL certificate. |
| String | No | Endpoint identification algorithm to validate the server hostname. |
| String | No | Compression codec to use. One of:
|
| Boolean | No | Whether to connect using SASL. |
| String | No | SASL mechanism to use. If you set a SASL mechanism you must set
Choose from |
| String | No | SASL protocol to use. If you set a SASL protocol you must set
|
| String | No | Credentials for JAAS authentication. |
| Various | No | Kafka consumer properties. Any properties you define here take priority over any configurations. See the Apache Kafka documentation for descriptions of these properties. |
| String | No | Kerberos service name. |
| String | No | Path to the Kerberos configuration file. |
| Boolean | No | Whether to enable Kerberos authentication debug logs. |
Response
The method returns the following parameter.
Type | Description |
---|---|
Object | A Java object containing connection details, depending on the requested connection properties. Returns |
Example
Example use of the connect
method without SSL:
var conn = kafka.connect( { servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"], connection_id: "KafkaConnection", use_ssl: false });
Example use of the connect
method using SSL:
var conn = kafka.connect( { servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"], connection_id: "KafkaConnection", use_ssl: true, ssl_truststore_location: "/path/to/truststore", ssl_truststore_password: "test1234", ssl_keystore_location: "/path/to/keystore", ssl_keystore_password: "test1234", ssl_key_password: "test1234", ssl_endpoint_identification_algorithm: "", use_sasl: false });
Example use of the connect
method using SASL:
var conn = kafka.connect( { servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"], connection_id: "KafkaConnection", use_ssl: false, use_sasl: true, sasl_mechanism: "PLAIN", security_protocol: "SASL_PLAINTEXT", sasl_jaas_config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=myuser password=mypassword;", additional_properties:{ "sasl.login.refresh.window.factor": "0.8", "sasl.login.refresh.window.jitter": "0.05", "sasl.login.refresh.min.period.seconds": "60", "sasl.login.refresh.min.buffer.seconds": "300" }, sasl_kerberos_service_name: "kafka", kerberos_conf_file_path : "/etc/krb5.conf", kerberos_debug_log : true });
send
Sends a message to the Kafka broker.
Request arguments
The method takes the following arguments.
Name | Type | Required | Description |
---|---|---|---|
| String | Yes | The Kafka topic to send the message on to. |
| String | No | The key to use for the message. |
| JSON | Yes | Message in one of the following formats:
|
Response
The method returns the following parameter:
Type | Description |
---|---|
Boolean | Indicates whether the send operation was successful. |
Example
Example use of the send
method:
if (connection) { connection.send("myTopic", {test_field: "value"}); connection.send("myTopic", "myKey", {test_field: "value"}); }
close
Closes the connection to the Kafka broker.
Request arguments
The method takes no arguments.
Response
None.
Examples
Example function using the Kafka module:
function sendToKafka(alert) { var connection = kafka.connect({ servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"], connection_id: "KafkaConnection", use_ssl: false }); if (connection){ boolean success = connection.send("myTopic", {test_field: "value"}); if (!success){ logger.warning("Failed to send message to Kafka"); } } }