Configure the Kafka LAM
Apache Kafka builds real-time data pipelines and streaming apps, and runs as a cluster of one or more servers. The Kafka cluster stores streams of records in categories called topics. Each record consists of a key, a value, and a time-stamp.
You can install a basic Kafka integration in the UI. See Apache Kafka for integration steps.
Before you begin
The Kafka integration has been validated with Kafka v0.9 and 1.1. Before you start to set up the LAM, ensure you have met the following requirements:
You have the host and ports for all the Kafka brokers you want to connect to.
The ports for your Kafka brokers are open and accessible from Moogsoft Onprem .
You know the name of the topics for the system to subscribe to.
You have the group ID of the consumer group.
Configure the LAM
Edit the configuration file to control the behavior of the Kafka LAM. You can find the file at $MOOGSOFT_HOME/config/kafka_lam.conf
.
The configuration file contains a JSON object. At the first layer of the object, the LAM has a parameter called config
, and the object that follows config has all the necessary information to control the LAM.
See Kafka Reference and LAM and Integration Reference for a full description of all properties. Some properties in the file are commented out by default; remove the '#' character to enable them.
Configure the connection properties:
kafka_listener: The hostname along with the port of the Kafka broker.
topic_name: The name of the topic(s) in the Kafka broker you are consuming events from.
groupid: The name of the consumer group.
Configure SSL if you want to encrypt communications between Kafka and Moogsoft Onprem :
ssl_connection: Set to
true
to enable SSL communication.ssl_truststore_filename: The path of the truststore certificate.
ssl_truststore_password: The password for the truststore certificate.
ssl_keystore_filename: The path of the keystore certificate.
ssl_keystore_password: The password for the keystore certificate.
ssl_key_password: The password for the client certificate required in client authentication. It is the password entered in the ssl.key.password of the Kafka server.properties file.
Configure the
kafka_properties
section, which allows you to use Kafka consumer properties. Note that these take priority over the aforementioned SSL properties:ssl.endpoint.identification.algorithm: The endpoint identification algorithm used by clients to validate server host name.
sasl.mechanism: The SASL mechanism method for the Kafka broker to use.
security.protocol: The security protocol to use.
sasl.jaas.config: The type of JAAS authentication configuration to use for the Kafka broker.
sasl.login.refresh.window.factor: If using OAUTHBEARER, the login refresh thread will sleep until the specified window factor relative to the credential's lifetime has been reached, at which time it will attempt to refresh the credential.
sasl.login.refresh.window.jitter: The maximum amount of random jitter relative to the credential's lifetime that is added to the login refresh thread's sleep time.
sasl.login.refresh.min.period.seconds: The desired minimum time (in seconds) for the login refresh thread to wait before refreshing a credential.
sasl.login.refresh.min.buffer.seconds: The amount of buffer time (in seconds) to maintain before credential expiration when refreshing a credential.
sasl.kerberos.service.name: The Kerberos service name.
See the Apache Kafka documentation for more information on these properties.
Configure parsing and tokenisation. See the "Parsing and Tokenisation" section below for more information.
Optionally configure the LAM identification and capture logging details:
name: Maps to
$Laminstancename
, so that theagent
field indicates events Moogsoft Onprem ingests from this LAM.capture_log: Name and location of the LAM's capture log file, which it writes to for debugging purposes.
Optionally configure severity conversions. See Severity Reference for further information and "Conversion Rules" in Tokenize Source Event Data for details on conversions in general.
Optionally configure the process logging details:
configuration_file: Name and location of the LAM's process log configuration file. See Configure Logging for more information.
Parsing and tokenization
Note
The parsing section is used when the event format is a text message; if you have an event with JSON format then comment the parsing and the variables sections and uncomment builtInMapper: "CJsonDecoder" in the Mapping section of kafka_lam.conf
.
The parsing section is used for parsing the received event and tokenizing it.
The Kafka LAM receives data in two formats:
Text: The data is received in text format which can be parsed and tokenized in the Parsing section and then mapped to Moogsoft Onprem fields in the Variables and the Mapping section.
JSON: The data is received in a JSON format, which can be mapped directly using CJsonDecoder. The parsing and the variable section are not required for JSON format.
Parsing
The following two methods are available for parsing:
Text Message: The parsing will start when it gets NEW_MSG and end when it gets new line. The extracted string is then delimited as per the defined delimiters.
To enable this method set
type
tostart_and_end
and then configure thestart
andend
fields.Regular Expression: The parser searches a regular expression for the strings you define in the
pattern
field and delimits them in accordance with your configuration ofdelimiters
.To enable this method set
type
toregexp
.
Only use one parsing method at a time, commenting out the ones you are not using. See Tokenize Source Event Data for more information.
Tokenization
The parsed events are tokenized using either delimiters
or the regexp_subgroups
. See the Kafka Reference for more information.
Variables
Note
The variable section is used when the received event message type is TextMessage; a JSON event can be mapped directly to the Moog field in the Mapping section.
A received event is a positioned collection of tokens. The variables section enables you to name these positions. The naming of the positions helps you to identify the tokens.
Positions start at 1 and increase. In the example below, the token in position number 6 is a Manager name, so the user names the token as "Manager". The naming helps in mapping the tokens to the Moogsoft Onprem fields in the mapping section.
variables: [ { name: "signature", position: 1 }, { name: "source_id", position: 4 }, { name: "external_id", position: 3 }, { name: "Manager", position: 6 }, { name: "AlertGroup", position: 7 }, { name: "Class", position: 8 }, { name: "Agent", position: 9 }, { name: "severity", position: 5 }, { name: "description", position: 10 }, { name: "agent_time", position: 2 } ],
Map LAM properties
For events received in JSON format, you can directly map the event fields of Kafka. In the case of an event received in text format, the event is first tokenized in the Variable section, and the tokenized event is then mapped here. The parameters of the received events are displayed in Moogsoft Onprem accordingly.
Kafka Event Property | Kafka LAM Event Property |
---|---|
Agent |
|
Agent Location |
|
Agent Time |
|
Class |
|
Description |
|
External ID |
|
Manager |
|
Severity |
|
Signature |
|
Source ID |
|
Type |
|
The above example specifies the mapping of the Kafka alarm fields with the Moogsoft Onprem fields.
Example
An example Kafka LAM configuration is as follows:
monitor: { name: "Kafka Lam Monitor", class: "CKafkaMonitor", kafka_listener: "localhost:9092", topic_name: [ "topic1", "topic2" ], groupid: "consumer-group", ssl_connection: false } parsing: { type: "start_and_end", start_and_end: { start: [], end: ["\n"], delimiters: { ignoreQuotes: true, stripQuotes: true, ignores: "", delimiter: [",","\r"] } } }, #parsing: #{ #type: "regexp", #regexp: #{ #pattern: "(?mU)^(.*)$", #capture_group: 1, #tokeniser_type: "delimiters", #delimiters: #{ #ignoreQuotes: true, #stripQuotes: false, #ignores: "", #delimiter: ["\r"] #} #} #}, variables: [ { name: "signature", position: 1 }, { name: "source_id", position: 4 }, { name: "external_id", position: 3 }, { name: "Manager", position: 6 }, { name: "AlertGroup", position: 7 }, { name: "Class", position: 8 }, { name: "Agent", position: 9 }, { name: "severity", position: 5 }, { name: "description", position: 10 }, { name: "agent_time", position: 2 } ], mapping: { builtInMapper: "CJsonDecoder", catchAll: "overflow", rules: [ { name: "signature", rule: "$signature" }, { name: "source_id", rule: "$source_id" }, { name: "external_id", rule: "$external_id" }, { name: "manager", rule: "Kafka" }, { name: "source", rule: "$source" }, { name: "class", rule: "$class" }, { name: "agent", rule: "$LamInstanceName" }, { name: "agent_location", rule: "$agent_location" }, { name: "type", rule: "$type" }, { name: "severity", rule: "$severity", conversion: "sevConverter" }, { name: "description", rule: "$description" }, { name: "agent_time", rule: "$agent_time" } ] }, } ,log_config: { configuration_file: "$MOOGSOFT_HOME/config/logging/custom.log.json" } }
Configure for High Availability
Configure the Kafka LAM for high availability if required. See High Availability Overview for details.
Configure LAMbot processing
The Kafka LAMbot processes and filters events before sending them to the Message Bus. You can customize or bypass this processing if required. You can also load JavaScript files into the LAMbot and execute them.
See LAMbot Configuration for more information. An example Kafka LAM filter configuration is shown below.
filter: { presend: "KafkaLam.js", }
Start and stop the LAM
Restart the Kafka LAM to activate any changes you make to the configuration file or LAMbot.
The LAM service name is kafkalamd
.
See Control Moogsoft Onprem Processes for the commands to start, stop and restart the LAM.