The Kafka module allows you to broadcast information on a Kafka bus. For example, you can use it to push alert or Situation data to a data lake via Kafka.
The Kafka Moobot module is available to load into any standard Moobot. To load it, define a new global variable at the top of the Moobot Javascript file. For example:
var kafka = MooBot.loadModule('Kafka');
The Kafka module uses the following methods.
Establishes a connection to one or more Kafka brokers with defined connection properties.
The method takes the following arguments.
Name | Type | Required | Description |
---|---|---|---|
| Object | Yes | A JavaScript object containing connection properties. See Connection properties below. |
The properties
object can include the following keys:
Name | Type | Required | Description |
---|---|---|---|
| List | Yes | Specifies the Kafka broker or brokers to connect to. |
| String | Yes | Name of the connection. |
| Boolean | Yes | Whether to connect using SSL. |
| String | No | Path to the SSL truststore file. |
| String | No | Password for the SSL truststore file. |
| String | No | Encrypted password for the SSL truststore file. |
| String | No | Password for the SSL certificate. |
| String | No | Encrypted password for the SSL certificate. |
| String | No | Endpoint identification algorithm to validate the server hostname. |
| String | No | Compression codec to use. One of:
|
| Boolean | No | Whether to connect using SASL. |
| String | No | SASL mechanism to use. If you set a SASL mechanism you must set
Choose from |
| String | No | SASL protocol to use. If you set a SASL protocol you must set
|
| String | No | Credentials for JAAS authentication. |
| Various | No | Kafka consumer properties. Any properties you define here take priority over any configurations. See the Apache Kafka documentation for descriptions of these properties. |
| String | No | Kerberos service name. |
| String | No | Path to the Kerberos configuration file. |
| Boolean | No | Whether to enable Kerberos authentication debug logs. |
The method returns the following parameter.
Type | Description |
---|---|
Object | A Java object containing connection details, depending on the requested connection properties. Returns |
Example use of the connect
method without SSL:
var conn = kafka.connect(
{
servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"],
connection_id: "KafkaConnection",
use_ssl: false
});
Example use of the connect
method using SSL:
var conn = kafka.connect(
{
servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"],
connection_id: "KafkaConnection",
use_ssl: true,
ssl_truststore_location: "/path/to/truststore",
ssl_truststore_password: "test1234",
ssl_keystore_location: "/path/to/keystore",
ssl_keystore_password: "test1234",
ssl_key_password: "test1234",
ssl_endpoint_identification_algorithm: "",
use_sasl: false
});
Example use of the connect
method using SASL:
var conn = kafka.connect(
{
servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"],
connection_id: "KafkaConnection",
use_ssl: false,
use_sasl: true,
sasl_mechanism: "PLAIN",
security_protocol: "SASL_PLAINTEXT",
sasl_jaas_config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=myuser password=mypassword;",
additional_properties:{
"sasl.login.refresh.window.factor": "0.8",
"sasl.login.refresh.window.jitter": "0.05",
"sasl.login.refresh.min.period.seconds": "60",
"sasl.login.refresh.min.buffer.seconds": "300"
},
sasl_kerberos_service_name: "kafka",
kerberos_conf_file_path : "/etc/krb5.conf",
kerberos_debug_log : true
});
Sends a message to the Kafka broker.
The method takes the following arguments.
Name | Type | Required | Description |
---|---|---|---|
| String | Yes | The Kafka topic to send the message on to. |
| String | No | The key to use for the message. |
| JSON | Yes | Message in one of the following formats:
|
The method returns the following parameter:
Type | Description |
---|---|
Boolean | Indicates whether the send operation was successful. |
Example use of the send
method:
if (connection)
{
connection.send("myTopic", {test_field: "value"});
connection.send("myTopic", "myKey", {test_field: "value"});
}
Closes the connection to the Kafka broker.
The method takes no arguments.
None.
Example function using the Kafka module:
function sendToKafka(alert)
{
var connection = kafka.connect({
servers: ["my.kafka.broker:9092", "my.other.kafka.broker:9092"],
connection_id: "KafkaConnection",
use_ssl: false
});
if (connection){
boolean success = connection.send("myTopic", {test_field: "value"});
if (!success){
logger.warning("Failed to send message to Kafka");
}
}
}