# Moogsoft Docs

## Configure the Kafka LAM

Apache Kafka is used for building real-time data pipelines and streaming apps. Kafka runs as a cluster of one or more servers. The Kafka cluster, stores stream of records in categories called topics and each record consists of a key, a value, and a time-stamp.

See Apache Kafka for UI configuration instructions.

1. LAM reads configuration from the  kafka_lam.conf  file.
2. LAM connects to Kafka Broker and requests for data present for a topic.
3. LAM will start multiple threads, based on the number of topics it is listening to.
4. If an event is available for a topic, it will be consumed by the LAM.
5. The events are parsed and converted into normalized Moogsoft AIOps events.
6. The normalized events are then published to MooMS bus.

# Configuration

The events received from Kafka are processed according to the configurations in the  kafka_lam.conf  file. The processed events are published to Moogsoft AIOps.

The configuration file contains a JSON object. At the first layer of the object, the LAM has a parameter called  config  , and the object that follows config has all the necessary information to control the LAM.

The following sections are available for configuration in the Kafka LAM configuration file:

## Monitor

Kafka LAM takes the event data from the Kafka broker.  You can configure the parameters here to establish connection with Kafka:

### General

Field
Type
Description
Example
name and class String

Reserved fields: do not change. Default values are Kafka Lam Monitor and CKafkaMonitor.

kafka_listener String Enter the hostname along with the port of the Kafka broker. "Localhost:9092"
topic_name
String

Enter the name of topic(s) in the Kafka broker from where you are fetching the events.

groupid String Enter the name of the consumer group. The group id is required by kafka to identify the consumer or a group of consumers, consuming data from a topic. Kafka distributes the data evenly among the consumers of the same group. This helps in the faster fetching of complete data of the topic by the consumers. This is especially helpful when there are multiple partitions in a topic, then an individual consumer may pick the data from an individual partition of the topic, hence increasing the speed of the LAM in consuming the data. "Kafka-consumer-group"

### Secure Sockets Layer

Field
Type
Description
ssl Boolean

Set to true, to enable SSL Communication:

• ssl_truststore_filename : Enter the path of the truststore file. This is the path where the generated truststore file is copied in Moogsoft AIOps, e.g. "/usr/local/kafka_ssl/client.truststore.jks".

• ssl_keystore_filename : Enter the path of the truststore file. This is the path where the generated keystore file is copied in Moogsoft AIOps, e.g. "/usr/local/kafka_ssl/client.keystore.jks".
• ssl_key_password : Enter the password of the client certificate required in client authentication. It is the password entered in the ssl.key.password of the Kafka server.properties file.

• ### Note

In case of connection failure between Kafka and the Kafka LAM, the LAM will not disconnect with Kafka; instead, it will continuously poll Kafka, and fetch all the messages from the topics after re-establishing the connection.

Config File
config :
{
monitor:
{

name                         : "Kafka Lam Monitor",

class                        : "CKafkaMonitor",

kafka_listener               : "localhost:9092",

topic_name					:
[
"topic1",
"topic2"
],

groupid                      : "consumer-group",

ssl_connection               : false,

ssl_truststore_filename      : "client.truststore.jks",

ssl_keystore_filename        : "client.keystore.jks",

},




## Agent and Process Log

The Agent and Process Log sections of the configuration file allow you to define the following properties:

• name : Identifies events the LAM sends to the Message Bus.
• capture_log : Name and location of the LAM's capture log file. See Moogsoft AIOps Component Logs for more information.
• configuration_file : Name and location of the LAM's process log configuration file. See Moogsoft AIOps Component Logs for more information.

## HA Configuration

Refer the document LAMs and High Availability

## Parsing & Tokenisation

The parsing section is used for parsing the received event and tokenising it. The Kafka LAM received data in following 2 formats:

• Text : The data is received in text format which can be parsed and tokenised in the Parsing section and then mapped to Moogsoft AIOps fields in the Variables and the Mapping section.
• JSON : The data is received in a JSON format, which can be mapped directly to the Moogsoft AIOps fields using CJsonDecoder. The parsing and the variable section are not required for JSON format.
parsing:
{
type: "start_and_end",
start_and_end:
{
start:      [],
end:        ["\n"],

delimiters:
{

ignoreQuotes: true,
stripQuotes: true,
ignores:    "",
delimiter:  [",","\r"]
}
}
},

# parsing:
# {
#     type: "regexp",
#     regexp:
#     {
#         pattern : "(?mU)^(.*)$", # capture_group: 1, # tokeniser_type: "delimiters", # delimiters: # { # ignoreQuotes: true, # stripQuotes: false, # ignores: "", # delimiter: ["\r"] # } # } # }, # parsing: # { # type: "regexp", # regexp: # { # pattern : "(?mU)^(.*)\t(.*)\t(.*)$",
#         tokeniser_type: "regexp_subgroups",
#     }
# },

### Parsing

The following 2 methods are available for parsing:

• Text Message: To enable this parsing the type is set to Start_and_End.
• Regular Expression: To enable this the type is set to regexp.

### Note

At a time only one parsing method will be used, you can comment the other parsing methods which are not in use.

Text Message Parsing

The Type should be set start_and_end as shown in the below example:

type: start_and_end:
{
start:      [NEW_MSG],
end:        ["\n"],
...

The parsing in above example the parsing will start when it gets NEW_MSG and end when it gets new line. The extracted string is then delimited as per the defined delimiters.

Regular Expression Parsing

In a regular expression, the parser searches for strings as per the expression defined in the pattern field. The extracted string is then delimited as per the defined delimiters. In the above example, the parser searches for the expression "(?mU)^(.*)$". Tokenisation The parsed events is tokenised using the delimiters or the regexp_subgroups. Delimiters Delimiters define how a line is split into tokens, for example, if you have a line of text data, it needs to be split up into a sequence of substrings that are referenced by position from the start. So, if you are processing a comma-separated file, where each value is separated by a comma, it would make sense to have the delimiter defined as a comma. The system would take all the text between start and end and break it up into tokens between the commas. The tokens could then be referenced by position number in the string starting from one, not zero. For example, if input string is cat,sat,on,the,mat and a comma is used as a separator, then token 1 will be cat, token 2 will be sat and so on. Please make sure that there are few complications when it comes to tokenisation and parsing. For example, if you say comma is the delimiter, and the token contains a comma, you will end up with that token containing a comma to be split into 2 tokens. To avoid this, it is recommended to quote strings. You must then allow the system to know whether it should strip or ignore quotes, hence the  stripQuotes  and  ignoreQuotes  parameters.  ignoreQuotes: true, stripQuotes: false, ignores: "", delimiter: [",","\r"]  The above example specifies: • If you have strings that are quoted between delimiters,  ignoreQuotes  set to  true,  will look for delimiters inside the quote. For example, <delimiter>hello inside quote goodbye<delimiter> gives a token [hello inside quote goodbye]. • Setting  stripQuotes  to true removes start and end quotes from tokens. For example, hello world gives a token [hello world]. •  ignores  is a list of characters to ignore. Ignored characters are never included in tokens. • Delimiter is the list of valid delimiters used to split strings into tokens. regexp_subgroups This tokenising method tokenises the extracted string based on groups in a message. An expression in the parenthesis in the regular expression denotes a group. For example, the part expression in a regular expression((?(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\\s+\\d{1,2}) is a group which contains the date and time. ### Note Parsing section is used when the event format is a text message. If we have an event with JSON format then comment the parsing and the variables sections and uncomment builtInMapper: "CJsonDecoder" in the Mapping section. For text message comment builtInMapper: "CJsonDecoder". ## Variables A received event is a positioned collection of tokens. The parsing section breaks the event into tokens. Moog enables a user to name these positions. The naming of the positions helps the user to identify the tokens. In the below given example token at position number 6 is a Manager name, so the user names the token as "Manager". The naming helps in mapping the tokens to the Moogsoft AIOps fields in the mapping section. variables: [ # # Note that positions start at 1, and go up # rather than array index style counting from zero # { name: "signature", position: 1 }, { name: "source_id", position: 4 }, { name: "external_id", position: 3 }, { name: "Manager", position: 6 }, { name: "AlertGroup", position: 7 }, { name: "Class", position: 8 }, { name: "Agent", position: 9 }, { name: "severity", position: 5 }, { name: "description", position: 10 }, { name: "agent_time", position: 2 } ], position 1 is assigned to signature; position 4 is assigned to source_id and so on. Positions start at 1 and go up. ### Note The variable section is used when the received event message type is TextMessage. A JSON event can be mapped directly to the Moog field in the Mapping section ## Mapping For events received in JSON format, you can directly map the event fields of Kafka with moogsoft fields. In the case of an event received in text format, the event is first tokenised in the Variable section, and the tokenised event is then mapped here in the mapping section. The parameters of the received events are displayed in Moogsoft AIOps according to the mapping done here. mapping : { builtInMapper: "CJsonDecoder", catchAll: "overflow", rules: [ { name: "signature", rule: "$signature" },
{ name: "source_id", rule:      "$source_id" }, { name: "external_id", rule: "$external_id" },
{ name: "manager", rule:        "Kafka" },
{ name: "source", rule:         "$source" }, { name: "class", rule: "$class" },
{ name: "agent", rule:          "$LamInstanceName" }, { name: "agent_location", rule: "$agent_location" },
{ name: "type", rule:           "$type" }, { name: "severity", rule: "$severity", conversion: "sevConverter" },
{ name: "description", rule:    "$description" }, { name: "agent_time", rule: "$agent_time", conversion: "stringToInt" }
]
},
filter:
{
presend: "kafkaLam.js"
}

The above example specifies the mapping of the Kafka alarm fields with the Moogsoft AIOps fields. The stringToInt is used to convert the time received in the string format into an integer format.

### Note

The signature field is used by the LAM to identify correlated events.

### Note

The above mapping is an example of a generic mapping and has to be configured according to the received event fields.

## Constants and Conversions

Constants and Conversions allow you to convert formats of the received data.

Field
Description
Example
Severity and sevConverter

has a conversion defined as sevConverter in the Conversions section, this looks up the value of severity defined in the severity section of constants and returns back the mapped integer corresponding to the severity.

severity:
{
   "CLEAR"    : 0,
"NORMAL"   : 1,
"WARNING"  : 2,
"MINOR"    : 3,
"MAJOR"    : 4,
"CRITICAL" : 5

},
sevConverter:
{
lookup : "severity",
input  : "STRING",
output : "INTEGER"
},
stringToInt

used in a conversion, which forces the system to turn a string token into an integer value.

stringToInt:
{
input  : "STRING",
output : "INTEGER"
},

### Example

Example Constants and Conversions
constants:
{
severity:
{
"CLEAR"         : 0,
"INDETERMINATE" : 1,
"WARNING"       : 2,
"MINOR"         : 3,
"MAJOR"         : 4,
"CRITICAL" 		: 5
}

},
conversions:
{
sevConverter:
{
lookup: "severity",
input:  "STRING",
output: "INTEGER"
},

stringToInt:
{
input:      "STRING",
output:     "INTEGER"
},

},

See Severity Reference for a description of severity levels in Moogsoft AIOps.

# Service Operation Reference

Process Name Service Name
kafka_lam kafkalamd

Start the LAM Service:

service kafkalamd start

Stop the LAM Service:

service kafkalamd stop

Check the LAM Service status:

service kafkalamd status

If the LAM fails to connect to Kafka, Moogsoft AIOps creates an alert and writes the details to the process log. Refer to the Logging details for LAMs and integrations for more information.

# Command Line Reference

To see the available optional attributes of the kafka_lam, run the following command:

kafka_lam --help

The kafka_lam is a command line executable, and has the following optional attributes:

Option Description

--config

Points to a pathname to find the configuration file for the LAM. This is where the entire configuration for the LAM is specified.
--help Displays all the command line options.
--version

Displays the component’s version number.

--loglevel

Specifies the level of debugging. By default, user gets everything. In common with all executables in MOOG, having it set at that level can result in a lot of output (many messages per event message processed).

In all production implementations, it is recommended that log level is set to WARN. This ensures only warning, error and fatal messages are recorded.

# Performance Information

Minimum requirement
Component Value
CPU 2 core
RAM 4 GB
Operating System CentOS Linux release 6.7

# Version Information

Integration Version

Tool Version

1.0 - 1.5

Kafka 0.9.0.0, 1.1