Skip to content

Kafka: use cases

Anton Zelenin edited this page Nov 4, 2021 · 5 revisions

Running counters

If the value of your metrics is an increasing counter you can specify running_counter as a target type. The agent will count the difference between values.

The state is stored in memory and is not persistent between restarts of the pipeline

Example

Input data

{"time":"1584545458","packets_sent":"100","host":"host134"}
{"time":"1584545707","packets_sent":"163","host":"host134"}
{"time":"1584545459","packets_sent":"215","host":"host134"}

Pipeline configuration

[
    {
        "source": "kafka_source",
        "pipeline_id": "test",
        "values": {"packets_sent": "running_counter"},
        "measurement_names": {},
        "dimensions": {"required": ["host"]},
        "timestamp": {"type": "unix", "name": "time"}
    }
]

Result

[
  {
      "timestamp": 1584545707,
      "value": 63,
      "properties": {
          "what": "packets_sent",
          "host": "host134",
          "target_type": "counter"
      }  
  },
  {
      "timestamp": 1584545459,
      "value": 52,
      "properties": {
          "what": "packets_sent",
          "host": "host134",
          "target_type": "counter"
      }  
  }
]

Dynamic metric name

When the metric name is stored in the data, you need to choose not static what option when configuring the pipeline.

Example:

Input data

{"metric":"packets_out","time":"1584545458","metric_value":"1456","host":"host134"}
{"metric":"packets_in","time":"1584545707","metric_value":"0","host":"host134"}
{"metric":"optical_power","time":"1584545459","metric_value":"-54","host":"host787"}

Pipeline configuration

[
    {
        "source": "kafka_source",
        "pipeline_id": "test",
        "values": {"metric_value": "counter"},
        "measurement_names": {"metric_value": "metric"},
        "static_what": false,
        "dimensions": {"required": ["host"]},
        "timestamp": {"type": "unix", "name": "time"}
    }
]

For the measurement name you need to specify the name of the property where it's stored (metric in this case) And also you need to specify "static_what": false option

Result

[
  {
      "timestamp": 1584545458,
      "value": 1456,
      "properties": {
          "what": "packets_out",
          "host": "host134",
          "target_type": "counter"
      }  
  },
  {
      "timestamp": 1584545707,
      "value": 0,
      "properties": {
          "what": "packets_in",
          "host": "host134",
          "target_type": "counter"
      }  
  },
  {
      "timestamp": 1584545459,
      "value": -54,
      "properties": {
          "what": "optical_power",
          "host": "host787",
          "target_type": "counter"
      }  
  }
]

If not all your metrics in the topic have the same target type, you can also specify a property where the target type is stored instead of a static target type:

Input data

{"metric":"packets_out","time":"1584545458","metric_value":"1456","host":"host134", "agg_type": "counter"}
{"metric":"packets_in","time":"1584545707","metric_value":"0","host":"host134", "agg_type": "counter"}
{"metric":"optical_power","time":"1584545459","metric_value":"-54","host":"host787", "agg_type": "gauge"}

Pipeline configuration

[
    {
        "source": "kafka_source",
        "pipeline_id": "test",
        "values": {"metric_value": "agg_type"},
        "measurement_names": {"metric_value": "metric"},
        "static_what": false,
        "dimensions": {"required": ["host"]},
        "timestamp": {"type": "unix", "name": "time"}
    }
]

Result

[
  {
      "timestamp": 1584545458,
      "value": 1456,
      "properties": {
          "what": "packets_out",
          "host": "host134",
          "target_type": "counter"
      }  
  },
  {
      "timestamp": 1584545707,
      "value": 0,
      "properties": {
          "what": "packets_in",
          "host": "host134",
          "target_type": "counter"
      }  
  },
  {
      "timestamp": 1584545459,
      "value": -54,
      "properties": {
          "what": "optical_power",
          "host": "host787",
          "target_type": "gauge"
      }  
  }
]

Metrics in JSON array

Input data

{
  "time":"1584545458",
  "host":"host134",
  "metrics": [
    {"name": "packets_in", "value": 12342212},
    {"name": "packets_out", "value": 1223333},
    {"name": "metric_avg", "value": 1223333}
  ]
}

Pipeline configuration

[
    {
        "source": "kafka_source",
        "pipeline_id": "test",
        "values": {"value": "gauge"},
        "measurement_names": {"value": "name"},
        "static_what": false,
        "values_array_path": "metrics",
        "values_array_filter_metrics": ["packets_in", "packets_out"],
        "dimensions": {"required": ["host"]},
        "timestamp": {"type": "unix", "name": "time"}
    }
]

Result

[
  {
      "timestamp": 1584545458,
      "value": 12342212,
      "properties": {
          "what": "packets_out",
          "host": "host134",
          "target_type": "counter"
      }  
  },
  {
      "timestamp": 1584545458,
      "value": 1223333,
      "properties": {
          "what": "packets_in",
          "host": "host134",
          "target_type": "counter"
      }  
  }
]
Clone this wiki locally