Page tree
Skip to end of metadata
Go to start of metadata

Overview

Kafka is used for building real-time data pipelines and streaming apps. Some concepts about Kafka:

  • Kafka is run as a cluster of one or more servers
  • The Kafka cluster stores stream of records in categories called topics
  • Each record consists of a key, a value, and a timestamp

Process Workflow

The process workflow of the Kafka LAM is as follows:

  1. LAM reads configuration from the kafka_conf file.
  2. LAM connects to Kafka Broker and requests for data present for a topic.
  3. LAM will start multiple threads, based on the number of topics it is listening to.
  4. If a message is available for a topic it will be consumed by the LAM.
  5. The LAM parses the message and converts it into a map message and publishes to MooMS bus.
  6. Create events that are submitted to MooMS bus.
  7. Publish the subject to "€Events"€.

Kafka LAM Configuration

The alarms received from Kafka are processed according to the configurations in the kafka_lam.conf file. The processed alarms are published to Moogsoft AIOps.

The configuration file contains a JSON object. At the first layer of the object, the LAM has a parameter called config, and the object that follows config has all the necessary information to control the LAM.

The following sections are available for configuration in the Kafka LAM configuration file.

Monitor

Kafka LAM takes the event data from the Kafka broker. The user can configure the parameters here to establish a connection with Kafka.

 config :
    {
        monitor:
        {

           name                         : "Kafka Lam Monitor",

           class                        : "CKafkaMonitor",

           kafka_listener                : "localhost:9092",
 
           topic_name:      		 
            							[
                							"topic1",
                							"topic2"
            							],
                                           
           groupid                      : "consumer-group",  

           ssl_connection               : false,

           ssl_truststore_filename      : "client.truststore.jks",
	
           ssl_truststore_password		: "password"
          
           ssl_keystore_filename        : "client.keystore.jks",
          
           ssl_keystore_password        : "password",
            
           ssl_key_password      	    : "password"
                        
        },
  • name and class: These fields are reserved and should not be changed. The default values are Kafka Lam Monitor and CKafkaMonitor respectively

  • kafka_listener: Enter the hostname along with the port of the Kafka broker. E.g. "Localhost:9092"

  • topic_name: The name of the topic in the Kafka broker from where the events are fetched. Multiple topics can be entered here as shown in the above example

  • groupid: The name of the consumer group e.g. "Kafka-consumer-group". The group id is required by kafka to identify the consumer or a group of consumers, consuming data from a topic. Kafka distributes the data evenly among the consumers of the same group. This helps in the faster fetching of complete data of the topic by the consumers. This is especially helpful when there are multiple partitions in a topic, then an individual consumer may pick the data from an individual partition of the topic, hence increasing the speed of the LAM in consuming the data.

  • ssl_connection: Enter true here, to enable SSL Communication

  • ssl_truststore_filename: Enter the path of the truststore file. This is the path where the generated truststore file is copied. e.g. "/usr/local/kafka_ssl/client.truststore.jks"

  • ssl_truststore_password: Enter the password of truststore

  • ssl_keystore_filename: Enter the path of the truststore file. This is the path where the generated keystore file is copied. e.g. "/usr/local/kafka_ssl/client.keystore.jks"
  • ssl_keystore_password: Enter the password of keystore
  • ssl_key_password: Enter the password of the client certificate required in client authentication. It is the password entered in the ssl.key.password of the Kafka server.properties file

If there is a connection failure between Kafka and the Kafka LAM, the LAM does not disconnect with Kafka; instead, it continuously polls Kafka, and fetches all the messages from the topics once the connection is re-established

Agent

Agent allows the user to define two parameters:

agent:
        {
                name    : "Kafka",
                #log    : "/var/log/moogsoft/kafka_lam.log"
        },


The above example specifies:

name: This is the agent name, the events sent to MooMS by the Kafka LAM are identified by the agent name in the log. In this example the agent name is Kafka

log: In this instance, the Kafka LAM will write its ingress contents to kafka_lam.log located at /var/log/moogsoft

HA Configuration

Refer the document HA Configuration of LAM

Parsing & Tokenisation

The parsing section is used for parsing the received event and tokenising it. The Kafka LAM received data in following 2 formats:

  • Text: The data is received in text format which can be parsed and tokenised in the Parsing section and then mapped to Moogsoft AIOps fields in the Variables and the Mapping section
  • JSON: The data is received in a JSON format, which can be mapped directly to the Moogsoft AIOps fields using CJsonDecoder. The parsing and the variable section are not required for JSON format
parsing:
        {
            type: "start_and_end",
            start_and_end:
            {
                start:      [],
                end:        ["\n"],

                delimiters:
                {

                    ignoreQuotes: true,
                    stripQuotes: true,
                    ignores:    "",
                    delimiter:  [",","\r"]
                }
            }
        },

        # parsing:
        # {
        #     type: "regexp",
        #     regexp:
        #     {
        #         pattern : "(?mU)^(.*)$",
        #         capture_group: 1,
        #         tokeniser_type: "delimiters",
        #         delimiters:
        #         {
        #                 ignoreQuotes: true,
        #                 stripQuotes: false,
        #                 ignores:    "",
        #                 delimiter:  ["\r"]
        #         }
        #     }
        # },
 
        # parsing:
        # {
        #     type: "regexp",
        #     regexp:
        #     {
        #         pattern : "(?mU)^(.*)\t(.*)\t(.*)$",
        #         tokeniser_type: "regexp_subgroups",
        #     }
        # },

Parsing

The following 2 methods are available for parsing:

The above example specifies the following 2 types of parsing:

  • Text Message: To enable this parsing the type is set to Start_and_End
  • Regular Expression: To enable this the type is set to regexp


At a time only one parsing method is used, other parsing methods can be commented if not used


Text Message Parsing

The Type should be set start_and_end and as shown in the below example:

type: start_and_end:
{
    start:      [NEW_MSG],
    end:        ["\n"],
...

The parsing in above example the parsing will start when it gets NEW_MSG and end when it gets new line. The extracted string is then delimited as per the defined delimiters.

Regular Expression Parsing

In a regular expression, the parser searches for strings as per the expression defined in the pattern field. The extracted string is then delimited as per the defined delimiters. In the above example, the parser searches for the expression "(?mU)^(.*)$".

Tokenisation

The parsed events is tokenised using the delimiters or the regexp_subgroups.

Delimiters

Delimiters define how a line is split into tokens – “tokenising”. For example, if you have a line of text data, it needs to be split up into a sequence of substrings that are referenced by position from the start. So if you were processing a comma-separated file, where a comma separates each value, it would make sense to have the delimiter defined as a comma. Then the system would take all the text between start and end and break it up into tokens between the commas. The tokens could then be referenced by position number in the string starting from one, not zero.

For example if the input string was “the,cat,sat,on,the,mat” and comma was used as a separator, token 1 would be “the”, token 2 “cat” and so on.

Be aware, there are complications when you come to tokenisation and parsing. For example, if you say comma is the delimiter, and the token contains a comma, you will end up with that token containing a comma to be split into 2 tokens. To avoid this it is recommended that you quote strings. You must then allow the system to know whether it should strip or ignore quotes, hence the stripQuotes and ignoreQuotes parameters.

 ignoreQuotes: true, 
 stripQuotes: false, 
 ignores: "", 
 delimiter: [",","\r"]

The above example specifies:

  • If you have strings that are quoted between delimiters, ignoreQuotes set to true, will look for delimiters inside the quote. For example, <delimiter>”hello “inside quote” goodbye”<delimiter> gives a token [hello inside quote goodbye]
  • Setting stripQuotes to true removes start and end quotes from tokens. For example, “hello world” gives a token [hello world]
  • ignores is a list of characters to ignore. Ignored characters are never included in tokens
  • Delimiter is the list of valid delimiters used to split strings into tokens

regexp_subgroups

This tokenising method tokenises the extracted string based on groups in a message. An expression in the parenthesis in the regular expression denotes a group. For example, the part expression in a regular expression((?(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\\s+\\d{1,2}) is a group which contains the date and time.


Parsing section is used when the event format is a text message. If we have an event with JSON format then comment the parsing and the variables sections and uncomment builtInMapper: "CJsonDecoder" in the Mapping section. For text message comment builtInMapper: "CJsonDecoder"

Variables

A received event is a positioned collection of tokens. The parsing section breaks the event into tokens. Moog enables a user to name these positions. The naming of the positions helps the user to identify the tokens. In the below given example token at position number 6 is a Manager name, so the user names the token as "Manager". The naming helps in mapping the tokens to the Moogsoft AIOps fields in the mapping section.

variables:
        [
            #
            # Note that positions start at 1, and go up
            # rather than array index style counting from zero
            #
            { name: "signature",   position: 1 },
            { name: "source_id",   position: 4 },
            { name: "external_id", position: 3 },
            { name: "Manager",     position: 6 },
            { name: "AlertGroup",  position: 7 },
            { name: "Class",       position: 8 },
            { name: "Agent",       position: 9 },
            { name: "severity",    position: 5 },
            { name: "description", position: 10 },
            { name: "agent_time",  position: 2 }
        ],

The above example specifies:

position 1 is assigned to signature; position 4 is assigned to source_id and so on. Positions start at 1 and go up.

The variable section is used when the received event message type is TextMessage. A JSON event can be mapped directly to the Moog field in the Mapping section

Mapping

For events received in JSON format, a user can directly map the alarm/event fields of Kafka with moogsoft fields. In the case of an event received in text format, the event is first tokenised in the Variable section, and the tokenised event is then mapped here in the mapping section. The parameters of the received alarm/event are displayed in Moogsoft AIOps according to the mapping done here.

mapping :
        {
            builtInMapper:  "CJsonDecoder",
            catchAll: "overflow",
            rules:
            [
                { name: "signature", rule:      "$signature" },
                { name: "source_id", rule:      "$source_id" },
                { name: "external_id", rule:    "$external_id" },
                { name: "manager", rule:        "Kafka" },
                { name: "source", rule:         "$source" },
                { name: "class", rule:          "$class" },
                { name: "agent", rule:          "$LamInstanceName" },
                { name: "agent_location", rule: "$agent_location" },
                { name: "type", rule:           "$type" },
                { name: "severity", rule:       "$severity", conversion: "sevConverter" },
                { name: "description", rule:    "$description" },
                { name: "agent_time", rule:     "$agent_time", conversion: "stringToInt" }
            ]
        },
        filter:
        {
            presend: "kafkaLam.js"
        }


The above example specifies the mapping of the Kafka alarm fields with the Moogsoft AIOps fields. The stringToInt is used to convert the time received in the string format into an integer format.

The signature field is used by the LAM to identify correlated alarms

The above mapping is an example of a generic mapping and has to be configured according to the received alarm/event fields

Constants and Conversions

Constants and Conversions allow the user to convert formats of the received data defined by users.

constants:
        {
            severity:
            {
                "CLEAR"         : 0,
                "INDETERMINATE" : 1,
                "WARNING"       : 2,
                "MINOR"         : 3,
                "MAJOR"         : 4,
                "CRITICAL" 		: 5
            }
        },
        conversions:
        {
            sevConverter:
            {
                lookup: "severity",
                input:  "STRING",
                output: "INTEGER"
            },
	    	            
            stringToInt:
            {
                input:      "STRING",
                output:     "INTEGER"
            },
         
            timeConverter:
            {
                timeFormat: "%D %T",
                input:      "STRING",
                output:     "INTEGER"
            }
        },


The above example specifies:

  • Severity and sevConverter: The severity field has a conversion defined as sevConverter in the Conversions section, this looks up the value of severity defined in the severity section of constants and returns back the mapped integer corresponding to the severity
  • stringToInt: It is used in a conversion, which forces the system to turn a string token into an integer value
  • timeConverter: It is used in conversion which forces the system to convert time. If epoc time is to be used, then timeFormat mentioned in timeConverter should be commented. Otherwise, the user should provide the timeFormat

catchALL

The attribute that is never referenced in a rule is collected and placed as a JSON object in a variable called overflow defined here and passed as part of the event.

mapping :
        {
            builtInMapper:  "CJsonDecoder",
            catchAll: "overflow",
            rules:
            [
                { name: "signature", rule:      "$signature" },
                { name: "source_id", rule:      "$source_id" },
                { name: "external_id", rule:    "$external_id" },
                { name: "manager", rule:        "Kafka" },
                { name: "source", rule:         "$source" },
                { name: "class", rule:          "$class" },
                { name: "agent", rule:          "$LamInstanceName" },
                { name: "agent_location", rule: "$agent_location" },
                { name: "type", rule:           "$type" },
                { name: "severity", rule:       "$severity", conversion: "sevConverter" },
                { name: "description", rule:    "$description" },
                { name: "agent_time", rule:     "$agent_time", conversion: "stringToInt" }
            ]
        },


The Kafka field creation_time is sent to the LAM. Since it is not mapped to a field in the kafka_lam.conf file, it is placed in the overflow JSON object. The fields that are placed in the overflow variable can be viewed in the Kafka LAM log file.

Example of a message sent through a Kafka topic.

Message
[{"signature":"0.1.8","source_id":"xvsdfgdg","external_id":"dduncan9","manager":"Sonsitng","source":"Indonesian","class":180,"agent_location":"Liangwa","type":"Violet","severity":"WARNING","description":"Yuan
Renminbi","agent_time":"07/27/12 19:06:01","test1":"1","test":"2"}]

Example of an overflow JSON object containing the unmapped fields, created in the Kafka LAM log file:

 INFO
: [EventFa][20161027 17:04:38.701 +0530] [CMooMsg.java]:1099 +|Encoded size
[376]
json[{"_MOOTADATA_":{"creation_time":1477568078591},"agent":"JMSLAM","agent_location":"Liangwa","agent_time":0,"class":180,"description":"Yuan
Renminbi","external_id":"dduncan9","manager":"Sonsitng","overflow":"{\"test\":\"2\",\"test1\":\"1\"}","severity":2,"signature":"0.1


Quotes

In some instances, the attribute strings are quoted. Our JSON parser ignores it, but the standard requires quoting for all strings, so Moogsoft recommends that user quote all strings.

Comments

A user can comment out lines by appending them with a hash.

Starting the Kafka LAM

To start the service:

service kafkalamd start

To stop the Kafka LAM enter the following command:

service kafkalamd stop


To view the status of Kafka LAM, enter the following command:

service kafkalamd status


Command line attributes

The kafka_lam is a command line executable that can be run as a service daemon, and takes four attributes, which can be viewed by typing:

kafka_lam --help

Option

Description

--config

Points to a pathname to find the configuration file for the LAM. This is where the entire configuration for the LAM is specified

--help

Displays all the command line options

--version

Displays the components version number

--log level

Specifies the level of debugging. By default, User gets everything. In common with all executables in MOOG, having it set at that level can result in a lot of output (many messages per event message processed).

In all production implementations, it is recommended that log level is set to WARN, which only informs user of matters of importance


Version Information

LAM Version

Tool Version

Tested?

Expected to Work

1.0

Kafka 0.9.0.0 

Yes

Yes

1.1

Kafka 0.9.0.0 

Yes

Yes

1.2

Kafka 0.9.0.0 

Yes

Yes

1.3

Kafka 0.9.0.0 

Yes

Yes

System Information

This LAM was tested on a system with the following configurations:

CPU2 core
RAM4 GB
Operating SystemCentOS Linux release 6.7

The system must at least have the above mentioned system requirements to run the LAM.

  • No labels