Moogsoft Docs

Configure the Kafka LAM

Apache Kafka is used for building real-time data pipelines and streaming apps. Kafka runs as a cluster of one or more servers. The Kafka cluster, stores stream of records in categories called topics and each record consists of a key, a value, and a time-stamp.

See Apache Kafka for UI configuration instructions.

  1. LAM reads configuration from the kafka_lam.conf file.

  2. LAM connects to Kafka Broker and requests for data present for a topic.

  3. LAM will start multiple threads, based on the number of topics it is listening to.

  4. If an event is available for a topic, it will be consumed by the LAM.

  5. The events are parsed and converted into normalized Moogsoft AIOps events.

  6. The normalized events are then published to MooMS bus.


The events received from Kafka are processed according to the configurations in the kafka_lam.conffile.

The configuration file contains a JSON object. At the first layer of the object, the LAM has a parameter called config, and the object that follows config has all the necessary information to control the LAM.


Kafka LAM takes the event data from the Kafka broker. You can configure the parameters here to establish connection with Kafka:






name and class


Reserved fields: do not change. Default values are Kafka Lam Monitor and CKafkaMonitor.



Enter the hostname along with the port of the Kafka broker.




Enter the name of topic(s) in the Kafka broker from where you are fetching the events.



Enter the name of the consumer group. The group id is required by kafka to identify the consumer or a group of consumers, consuming data from a topic. Kafka distributes the data evenly among the consumers of the same group. This helps in the faster fetching of complete data of the topic by the consumers. This is especially helpful when there are multiple partitions in a topic, then an individual consumer may pick the data from an individual partition of the topic, hence increasing the speed of the LAM in consuming the data.


Secure Sockets Layer






Set to true, to enable SSL Communication:

  • ssl_key_password: Enter the password of the client certificate required in client authentication. It is the password entered in the ssl.key.password of the Kafka file.

  • Note

    In case of connection failure between Kafka and the Kafka LAM, the LAM will not disconnect with Kafka; instead, it will continuously poll Kafka, and fetch all the messages from the topics after re-establishing the connection.

Config File

        name                         : "Kafka Lam Monitor",
        class                        : "CKafkaMonitor",
        kafka_listener               : "localhost:9092",
        topic_name                                       :      [
        groupid                      : "consumer-group",
        ssl_connection               : false
Agent and Process Log

The Agent and Process Log sections of the configuration file allow you to define the following properties:

  • name: Identifies events the LAM sends to the Message Bus.

  • capture_log: Name and location of the LAM's capture log file.

  • configuration_file: Name and location of the LAM's process log configuration file.

Parsing & Tokenisation

The parsing section is used for parsing the received event and tokenising it. The Kafka LAM receives data in following 2 formats:

  • Text: The data is received in text format which can be parsed and tokenised in the Parsing section and then mapped to Moogsoft AIOps fields in the Variables and the Mapping section.

  • JSON: The data is received in a JSON format, which can be mapped directly using CJsonDecoder. The parsing and the variable section are not required for JSON format.

            type: "start_and_end",
                start:      [],
                end:        ["\n"],


                    ignoreQuotes: true,
                    stripQuotes: true,
                    ignores:    "",
                    delimiter:  [",","\r"]

        # parsing:
        # {
        #     type: "regexp",
        #     regexp:
        #     {
        #         pattern : "(?mU)^(.*)$",
        #         capture_group: 1,
        #         tokeniser_type: "delimiters",
        #         delimiters:
        #         {
        #                 ignoreQuotes: true,
        #                 stripQuotes: false,
        #                 ignores:    "",
        #                 delimiter:  ["\r"]
        #         }
        #     }
        # },
        # parsing:
        # {
        #     type: "regexp",
        #     regexp:
        #     {
        #         pattern : "(?mU)^(.*)\t(.*)\t(.*)$",
        #         tokeniser_type: "regexp_subgroups",
        #     }
        # },

The following 2 methods are available for parsing:

  • Text Message: To enable this parsing the type is set to Start_and_End.

  • Regular Expression: To enable this the type is set to regexp.


At a time only one parsing method will be used, you can comment the other parsing methods which are not in use.

Text Message Parsing

The Type should be set start_and_end as shown in the below example:

type: start_and_end:
    start:      [NEW_MSG],
    end:        ["\n"],

The parsing in above example the parsing will start when it gets NEW_MSG and end when it gets new line. The extracted string is then delimited as per the defined delimiters.

Regular Expression Parsing

In a regular expression, the parser searches for strings as per the expression defined in the pattern field. The extracted string is then delimited as per the defined delimiters. In the above example, the parser searches for the expression "(?mU)^(.*)$".


The parsed events is tokenised using the delimiters or the regexp_subgroups.


Delimiters define how a line is split into tokens€, for example, if you have a line of text data, it needs to be split up into a sequence of substrings that are referenced by position from the start. So, if you are processing a comma-separated file, where each value is separated by a comma, it would make sense to have the delimiter defined as a comma. The system would take all the text between start and end and break it up into tokens between the commas. The tokens could then be referenced by position number in the string starting from one, not zero.

For example, if input string is cat,sat,on,the,mat and a comma is used as a separator, then token 1 will be cat, token 2 will be sat and so on.

Please make sure that there are few complications when it comes to tokenisation and parsing. For example, if you say comma is the delimiter, and the token contains a comma, you will end up with that token containing a comma to be split into 2 tokens. To avoid this, it is recommended to quote strings. You must then allow the system to know whether it should strip or ignore quotes, hence the stripQuotes and ignoreQuotes parameters.

 ignoreQuotes: true, 
 stripQuotes: false, 
 ignores: "", 
 delimiter: [",","\r"]

The above example specifies:

  • If you have strings that are quoted between delimiters, ignoreQuotes set to true, will look for delimiters inside the quote. For example, <delimiter>hello inside quote goodbye<delimiter> gives a token [hello inside quote goodbye].

  • Setting stripQuotes to true removes start and end quotes from tokens. For example, hello world gives a token [hello world].

  • ignores is a list of characters to ignore. Ignored characters are never included in tokens.

  • Delimiter is the list of valid delimiters used to split strings into tokens.


This tokenising method tokenises the extracted string based on groups in a message. An expression in the parenthesis in the regular expression denotes a group. For example, the part expression in a regular expression((?(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\\s+\\d{1,2}) is a group which contains the date and time.


Parsing section is used when the event format is a text message. If we have an event with JSON format then comment the parsing and the variables sections and uncomment builtInMapper: "CJsonDecoder" in the Mapping section. For text message comment builtInMapper: "CJsonDecoder".


A received event is a positioned collection of tokens. The parsing section breaks the event into tokens. Moogsoft AIOps enables a user to name these positions. The naming of the positions helps the user to identify the tokens. In the below given example token at position number 6 is a Manager name, so the user names the token as "Manager". The naming helps in mapping the tokens to the Moogsoft AIOps fields in the mapping section.

            # Note that positions start at 1, and go up
            # rather than array index style counting from zero
            { name: "signature",   position: 1 },
            { name: "source_id",   position: 4 },
            { name: "external_id", position: 3 },
            { name: "Manager",     position: 6 },
            { name: "AlertGroup",  position: 7 },
            { name: "Class",       position: 8 },
            { name: "Agent",       position: 9 },
            { name: "severity",    position: 5 },
            { name: "description", position: 10 },
            { name: "agent_time",  position: 2 }

position 1 is assigned to signature; position 4 is assigned to source_id and so on. Positions start at 1 and go up.


The variable section is used when the received event message type is TextMessage. A JSON event can be mapped directly to the Moog field in the Mapping section

For events received in JSON format, you can directly map the event fields of Kafka. In the case of an event received in text format, the event is first tokenised in the Variable section, and the tokenised event is then mapped here in the mapping section. The parameters of the received events are displayed in Moogsoft AIOps according to the mapping done here.

mapping :
            builtInMapper:  "CJsonDecoder",
            catchAll: "overflow",
                { name: "signature", rule:      "$signature" },
                { name: "source_id", rule:      "$source_id" },
                { name: "external_id", rule:    "$external_id" },
                { name: "manager", rule:        "Kafka" },
                { name: "source", rule:         "$source" },
                { name: "class", rule:          "$class" },
                { name: "agent", rule:          "$LamInstanceName" },
                { name: "agent_location", rule: "$agent_location" },
                { name: "type", rule:           "$type" },
                { name: "severity", rule:       "$severity", conversion: "sevConverter" },
                { name: "description", rule:    "$description" },
                { name: "agent_time", rule:     "$agent_time", conversion: "stringToInt" }
            presend: "kafkaLam.js"

The above example specifies the mapping of the Kafka alarm fields with the Moogsoft AIOps fields. The stringToInt is used to convert the time received in the string format into an integer format.


The signature field is used by the LAM to identify correlated events.


The above mapping is an example of a generic mapping and has to be configured according to the received event fields.

Constants and Conversions

Constants and Conversions allow you to convert formats of the received data.




Severity and sevConverter

has a conversion defined as sevConverter in the Conversions section, this looks up the value of severity defined in the severity section of constants and returns back the mapped integer corresponding to the severity.

   "CLEAR"    : 0,                
   "NORMAL"   : 1,
   "WARNING"  : 2,
   "MINOR"    : 3,
   "MAJOR"    : 4,
   "CRITICAL" : 5
    lookup : "severity",
    input  : "STRING",
    output : "INTEGER"


used in a conversion, which forces the system to turn a string token into an integer value.

    input  : "STRING",
    output : "INTEGER"

Example Constants and Conversions

                "CLEAR"         : 0,
                "INDETERMINATE" : 1,
                "WARNING"       : 2,
                "MINOR"         : 3,
                "MAJOR"         : 4,
                "CRITICAL"            : 5
                lookup: "severity",
                input:  "STRING",
                output: "INTEGER"
                input:      "STRING",
                output:     "INTEGER"
Service Operation Reference

Process Name

Service Name



Start the LAM Service:

service kafkalamd start

Stop the LAM Service:

service kafkalamd stop

Check the LAM Service status:

service kafkalamd status

If the LAM fails to connect to Kafka, Moogsoft AIOps creates an alert and writes the details to the process log.

Command Line Reference

To see the available optional attributes of the kafka_lam, run the following command:

kafka_lam --help

The kafka_lam is a command line executable, and has the following optional attributes:




Points to a pathname to find the configuration file for the LAM. This is where the entire configuration for the LAM is specified.


Displays all the command line options.


Displays the component’s version number.


Specifies the level of debugging. By default, user gets everything.

In all production implementations, it is recommended that log level is set to WARN. This ensures only warning, error and fatal messages are recorded.