Configure the Kafka LAM

Apache Kafka builds real-time data pipelines and streaming apps, and runs as a cluster of one or more servers. The Kafka cluster stores streams of records in categories called topics. Each record consists of a key, a value, and a time-stamp.

You can install a basic Kafka integration in the UI. See Apache Kafka for integration steps.

Before you begin

The Kafka integration has been validated with Kafka v0.9 and 1.1. Before you start to set up the LAM, ensure you have met the following requirements:

  • You have the host and ports for all the Kafka brokers you want to connect to.

  • The ports for your Kafka brokers are open and accessible from Moogsoft AIOps .

  • You know the name of the topics for the system to subscribe to.

  • You have the group ID of the consumer group.

Configure the LAM

Edit the configuration file to control the behavior of the Kafka LAM. You can find the file at $MOOGSOFT_HOME/config/kafka_lam.conf.

The configuration file contains a JSON object. At the first layer of the object, the LAM has a parameter called config, and the object that follows config has all the necessary information to control the LAM.

See Kafka Reference and LAM and Integration Reference for a full description of all properties. Some properties in the file are commented out by default; remove the '#' character to enable them.

  1. Configure the connection properties:

    • kafka_listener: The hostname along with the port of the Kafka broker.

    • topic_name: The name of the topic(s) in the Kafka broker you are consuming events from.

    • groupid: The name of the consumer group.

  2. Configure SSL if you want to encrypt communications between Kafka and Moogsoft AIOps :

    • ssl_connection: Set to true to enable SSL communication.

    • ssl_truststore_filename: The path of the truststore certificate.

    • ssl_truststore_password: The password for the truststore certificate.

    • ssl_keystore_filename: The path of the keystore certificate.

    • ssl_keystore_password: The password for the keystore certificate.

    • ssl_key_password: The password for the client certificate required in client authentication. It is the password entered in the ssl.key.password of the Kafka server.properties file.

  3. Configure the kafka_properties section, which allows you to use Kafka consumer properties. Note that these take priority over the aforementioned SSL properties:

    • ssl.endpoint.identification.algorithm: The endpoint identification algorithm used by clients to validate server host name.

    • sasl.mechanism: The SASL mechanism method for the Kafka broker to use.

    • security.protocol: The security protocol to use.

    • sasl.jaas.config: The type of JAAS authentication configuration to use for the Kafka broker.

    • sasl.login.refresh.window.factor: If using OAUTHBEARER, the login refresh thread will sleep until the specified window factor relative to the credential's lifetime has been reached, at which time it will attempt to refresh the credential.

    • sasl.login.refresh.window.jitter: The maximum amount of random jitter relative to the credential's lifetime that is added to the login refresh thread's sleep time.

    • sasl.login.refresh.min.period.seconds: The desired minimum time (in seconds) for the login refresh thread to wait before refreshing a credential.

    • sasl.login.refresh.min.buffer.seconds: The amount of buffer time (in seconds) to maintain before credential expiration when refreshing a credential.

    • sasl.kerberos.service.name: The Kerberos service name.

    See the Apache Kafka documentation for more information on these properties.

  4. Configure parsing and tokenisation. See the "Parsing and Tokenisation" section below for more information.

  5. Optionally configure the LAM identification and capture logging details:

    • name: Maps to $Laminstancename, so that the agent field indicates events Moogsoft AIOps ingests from this LAM.

    • capture_log: Name and location of the LAM's capture log file, which it writes to for debugging purposes.

  6. Optionally configure severity conversions. See Severity Reference for further information and "Conversion Rules" in Data Parsing for details on conversions in general.

  7. Optionally configure the process logging details:

    • configuration_file: Name and location of the LAM's process log configuration file. See Configure Logging for more information.

Parsing and tokenization

Note

The parsing section is used when the event format is a text message; if you have an event with JSON format then comment the parsing and the variables sections and uncomment builtInMapper: "CJsonDecoder" in the Mapping section of kafka_lam.conf.

The parsing section is used for parsing the received event and tokenizing it.

The Kafka LAM receives data in two formats:

  • Text: The data is received in text format which can be parsed and tokenized in the Parsing section and then mapped to Moogsoft AIOps fields in the Variables and the Mapping section.

  • JSON: The data is received in a JSON format, which can be mapped directly using CJsonDecoder. The parsing and the variable section are not required for JSON format.

Parsing

The following two methods are available for parsing:

  • Text Message: The parsing will start when it gets NEW_MSG and end when it gets new line. The extracted string is then delimited as per the defined delimiters.

    To enable this method set type to start_and_end and then configure the start and end fields.

  • Regular Expression: The parser searches a regular expression for the strings you define in the pattern field and delimits them in accordance with your configuration of delimiters.

    To enable this method set type to regexp.

Only use one parsing method at a time, commenting out the ones you are not using. See Data Parsing for more information.

Tokenization

The parsed events are tokenized using either delimiters or the regexp_subgroups. See the Kafka Reference for more information.

Variables

Note

The variable section is used when the received event message type is TextMessage; a JSON event can be mapped directly to the Moog field in the Mapping section.

A received event is a positioned collection of tokens. The variables section enables you to name these positions. The naming of the positions helps you to identify the tokens.

Positions start at 1 and increase. In the example below, the token in position number 6 is a Manager name, so the user names the token as "Manager". The naming helps in mapping the tokens to the Moogsoft AIOps fields in the mapping section.

variables:
        [
            { name: "signature",   position: 1 },
            { name: "source_id",   position: 4 },
            { name: "external_id", position: 3 },
            { name: "Manager",     position: 6 },
            { name: "AlertGroup",  position: 7 },
            { name: "Class",       position: 8 },
            { name: "Agent",       position: 9 },
            { name: "severity",    position: 5 },
            { name: "description", position: 10 },
            { name: "agent_time",  position: 2 }
        ],

Map LAM properties

For events received in JSON format, you can directly map the event fields of Kafka. In the case of an event received in text format, the event is first tokenized in the Variable section, and the tokenized event is then mapped here. The parameters of the received events are displayed in Moogsoft AIOps accordingly.

Kafka Event Property

Kafka LAM Event Property

Agent

$LamInstanceName

Agent Location

$agent_location

Agent Time

$agent_time

Class

$class

Description

$description

External ID

$external_id

Manager

Kafka

Severity

$severity

Signature

$signature

Source ID

$source_id

Type

$type

The above example specifies the mapping of the Kafka alarm fields with the Moogsoft AIOps fields.

Example

An example Kafka LAM configuration is as follows:

monitor:
{
        name: "Kafka Lam Monitor",
        class: "CKafkaMonitor",
        kafka_listener: "localhost:9092",
        topic_name: [
                     "topic1",
                     "topic2"
                    ],                       
        groupid: "consumer-group",
        ssl_connection: false
}

parsing:
        {
            type: "start_and_end",
            start_and_end:
            {
                start: [],
                end: ["\n"],

                delimiters:
                {

                    ignoreQuotes: true,
                    stripQuotes: true,
                    ignores: "",
                    delimiter: [",","\r"]
                }
            }
        },

         #parsing:
         #{
             #type: "regexp",
             #regexp:
             #{
                 #pattern: "(?mU)^(.*)$",
                 #capture_group: 1,
                 #tokeniser_type: "delimiters",
                 #delimiters:
                 #{
                         #ignoreQuotes: true,
                         #stripQuotes: false,
                         #ignores: "",
                         #delimiter: ["\r"]
                 #}
             #}
         #},

        variables:
        [
            { name: "signature", position: 1 },
            { name: "source_id", position: 4 },
            { name: "external_id", position: 3 },
            { name: "Manager", position: 6 },
            { name: "AlertGroup", position: 7 },
            { name: "Class", position: 8 },
            { name: "Agent", position: 9 },
            { name: "severity", position: 5 },
            { name: "description", position: 10 },
            { name: "agent_time", position: 2 }
        ],

        mapping:
        {

            builtInMapper: "CJsonDecoder",
            catchAll: "overflow",
            rules:
            [
                { name: "signature", rule: "$signature" },
                { name: "source_id", rule: "$source_id" },
                { name: "external_id", rule: "$external_id" },
                { name: "manager", rule: "Kafka" },
                { name: "source", rule: "$source" },
                { name: "class", rule: "$class" },
                { name: "agent", rule: "$LamInstanceName" },
                { name: "agent_location", rule: "$agent_location" },
                { name: "type", rule: "$type" },
                { name: "severity", rule: "$severity", conversion: "sevConverter" },
                { name: "description", rule: "$description" },
                { name: "agent_time", rule: "$agent_time" }
            ]
        },

    }
    ,log_config:
    {
       configuration_file: "$MOOGSOFT_HOME/config/logging/custom.log.json"
    }
}

Configure for High Availability

Configure the Kafka LAM for high availability if required. See High Availability Overview for details.

Configure LAMbot processing

The Kafka LAMbot processes and filters events before sending them to the Message Bus. You can customize or bypass this processing if required. You can also load JavaScript files into the LAMbot and execute them.

See LAMbot Configuration for more information. An example Kafka LAM filter configuration is shown below.

filter:
{
    presend: "KafkaLam.js",
}

Start and stop the LAM

Restart the Kafka LAM to activate any changes you make to the configuration file or LAMbot.

The LAM service name is kafkalamd.

See Control Moogsoft AIOps Processes for the commands to start, stop and restart the LAM.