Skip to content

Commit

Permalink
Merge branch '2358-iss-redis' into 'dev'
Browse files Browse the repository at this point in the history
Resolve "redis 采集到的指标和 prometheus 对齐"

See merge request cloudcare-tools/datakit!3184
  • Loading branch information
谭彪 committed Aug 28, 2024
2 parents adf73c1 + 2d93709 commit 0fe5dec
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 132 deletions.
2 changes: 1 addition & 1 deletion internal/export/doc/en/inputs/redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Already tested version:

- Redis version v5.0+

When collecting data under the master-slave architecture, please configure the host information of the slave node for data collection, and you can get the metric information related to the master-slave.
When collecting data under the master-slave architecture, please configure the host information of the slave node or master node for data collection, and you can get the different metric information related to the master-slave.

Create Monitor User (**optional**)

Expand Down
2 changes: 1 addition & 1 deletion internal/export/doc/zh/inputs/redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Redis 指标采集器,采集以下数据:

### 前置条件 {#reqirement}

- 在采集主从架构下数据时,请配置从节点的主机信息进行数据采集,可以得到主从相关的指标信息
- 在采集主从架构下数据时,配置从节点或主节点的主机信息进行数据采集,可以得到不同的主从相关的指标信息
- 创建监控用户(**可选**):redis 6.0+ 进入 `redis-cli` 命令行,创建用户并且授权:

```sql
Expand Down
2 changes: 2 additions & 0 deletions internal/plugins/inputs/redis/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ func (*Input) SampleMeasurement() []inputs.Measurement {
&commandMeasurement{},
&dbMeasurement{},
&infoMeasurement{},
&replicaMeasurement{},
&latencyMeasurement{},
&slowlogMeasurement{},
}
Expand Down Expand Up @@ -569,6 +570,7 @@ func (ipt *Input) Resume() error {
func defaultInput() *Input {
getClientFieldMap()
getInfoFieldMap()
getReplicaFieldMap()
getClusterFieldMap()

return &Input{
Expand Down
26 changes: 19 additions & 7 deletions internal/plugins/inputs/redis/metric_redis_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,15 +358,27 @@ func (m *infoMeasurement) parseInfoData(info string, latencyMs float64, nextTS t

val = strings.TrimSuffix(val, "%")

tagsToAdd := map[string]string{
"redis_version": "unknown",
"role": "unknown",
"redis_build_id": "unknown",
"redis_mode": "unknown",
"os": "unknown",
"maxmemory_policy": "unknown",
"run_id": "unknown",
"process_id": "unknown",
}

if defaultValue, exists := tagsToAdd[key]; exists {
if val == "" {
val = defaultValue
}
kvs = kvs.AddTag(key, val)
m.tags[key] = val
}

float, err := strconv.ParseFloat(val, 64)
if err != nil {
if key == "redis_version" {
if val == "" {
val = "unknown"
}
kvs = kvs.AddTag("redis_version", val)
m.tags["redis_version"] = val
}
continue
}

Expand Down
56 changes: 28 additions & 28 deletions internal/plugins/inputs/redis/metric_redis_info_test.go

Large diffs are not rendered by default.

201 changes: 111 additions & 90 deletions internal/plugins/inputs/redis/metric_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,21 @@ func (m *replicaMeasurement) Info() *inputs.MeasurementInfo {
Name: redisReplica,
Type: "metric",
Fields: map[string]interface{}{
"repl_delay": &inputs.FieldInfo{DataType: inputs.Int, Type: inputs.Gauge, Desc: "Replica delay"},
"master_link_down_since_seconds": &inputs.FieldInfo{DataType: inputs.Int, Type: inputs.Gauge, Desc: "Number of seconds since the link is down"},
"master_repl_offset": &inputs.FieldInfo{DataType: inputs.Int, Type: inputs.Gauge, Desc: "The server's current replication offset."},
// "repl_delay": &inputs.FieldInfo{DataType: inputs.Int, Type: inputs.Gauge, Desc: "Replica delay"},
"master_link_down_since_seconds": &inputs.FieldInfo{DataType: inputs.Int, Type: inputs.Count, Desc: "Number of seconds since the link is down when the link between master and replica is down, only collected for slave redis."},
"master_link_status": &inputs.FieldInfo{DataType: inputs.Int, Type: inputs.Gauge, Desc: "Status of the link (up/down), `1` for up, `0` for down, only collected for slave redis."},
"slave_offset": &inputs.FieldInfo{DataType: inputs.Int, Type: inputs.Gauge, Desc: "Slave offset, only collected for master redis."},
"slave_lag": &inputs.FieldInfo{DataType: inputs.Int, Type: inputs.Gauge, Desc: "Slave lag, only collected for master redis."},
},
Tags: map[string]interface{}{
"host": &inputs.TagInfo{Desc: "Hostname"},
"server": &inputs.TagInfo{Desc: "Server addr"},
"service_name": &inputs.TagInfo{Desc: "Service name"},
"slave_id": &inputs.TagInfo{Desc: "Slave ID"},
"host": &inputs.TagInfo{Desc: "Hostname."},
"server": &inputs.TagInfo{Desc: "Server addr."},
"service_name": &inputs.TagInfo{Desc: "Service name."},
"slave_id": &inputs.TagInfo{Desc: "Slave ID, only collected for master redis."},
"slave_addr": &inputs.TagInfo{Desc: "Slave addr, only collected for master redis."},
"slave_state": &inputs.TagInfo{Desc: "Slave state, only collected for master redis."},
"master_addr": &inputs.TagInfo{Desc: "Master addr, only collected for slave redis."},
},
}
}
Expand Down Expand Up @@ -69,19 +76,39 @@ var slaveMatch = regexp.MustCompile(`^slave\d+`)
// repl_backlog_size:1048576
// repl_backlog_first_byte_offset:1
// repl_backlog_histlen:966.
//
// role:slave
// master_host:127.0.0.1
// master_port:6380
// master_link_status:down
// master_last_io_seconds_ago:-1
// master_sync_in_progress:0
// slave_repl_offset:1
// master_link_down_since_seconds:1724739099
// slave_priority:100
// slave_read_only:1
// connected_slaves:0
// master_replid:45a37268f2359e5367c5767ce93ff303ad3f1918
// master_replid2:0000000000000000000000000000000000000000
// master_repl_offset:0
// second_repl_offset:-1
// repl_backlog_active:0
// repl_backlog_size:1048576
// repl_backlog_first_byte_offset:0
// repl_backlog_histlen:0

func (ipt *Input) parseReplicaData(list string) ([]*point.Point, error) {
collectCache := []*point.Point{}
opts := point.DefaultMetricOptions()
opts = append(opts, point.WithTime(time.Now()))
var masterIP, masterPort string

masterDownSeconds := map[string]float64{}
masterData := map[string]float64{}

// master
rdr := strings.NewReader(list)
scanner := bufio.NewScanner(rdr)
for scanner.Scan() {
var kvs point.KVs
line := scanner.Text()

if len(line) == 0 || line[0] == '#' {
continue
}
Expand All @@ -93,93 +120,48 @@ func (ipt *Input) parseReplicaData(list string) ([]*point.Point, error) {

key, value := record[0], record[1]

if key == "master_repl_offset" {
masterData["master_repl_offset"], _ = strconv.ParseFloat(value, 64)
}

if key == "master_link_down_since_seconds" {
masterDownSeconds["master_link_down_since_seconds"], _ = strconv.ParseFloat(value, 64)
}
}

// slaves
rdr = strings.NewReader(list)
scanner = bufio.NewScanner(rdr)
for scanner.Scan() {
var kvs point.KVs
slaveData := map[string]float64{}
var slaveID, ip, port string

line := scanner.Text()
if len(line) == 0 || line[0] == '#' {
continue
if key == "master_host" {
masterIP = value
}

record := strings.Split(line, ":")
if len(record) != 2 {
continue
if key == "master_port" {
masterPort = value
}

key, value := record[0], record[1]

if slaveMatch.MatchString(key) {
slaveID = strings.TrimPrefix(key, "slave")
kv := strings.SplitN(value, ",", 5)
if len(kv) != 5 {
continue
}

split := strings.Split(kv[0], "=")
if len(split) != 2 {
l.Warnf("Failed to parse slave ip, got %s", kv[0])
continue
// key in the replicaMeasurement
if _, has := replicaFieldMap[key]; has {
// slave redis, collect master data
if key == "master_link_status" {
switch value {
case "up":
kvs = kvs.Add(key, 1, false, false)
case "down":
kvs = kvs.Add(key, 0, false, false)
default:
l.Warnf("parseReplicaData: unexpected value for master_link_status, got %s", value)
continue
}
} else {
float, err := strconv.ParseFloat(value, 64)
if err != nil {
l.Warnf("parseMasterData: %s, expect to be int, got %s", err, value)
continue
}
kvs = kvs.Add(key, float, false, false)
}
ip = split[1]

split = strings.Split(kv[1], "=")
if len(split) != 2 {
l.Warnf("Failed to parse slave port, got %s", kv[1])
continue
}
port = split[1]

split = strings.Split(kv[3], "=")
if len(split) != 2 {
l.Warnf("Failed to parse slave offset, got %s", kv[3])
continue
}

temp, err := strconv.ParseFloat(split[1], 64)
if err != nil {
l.Warnf("ParseFloat: %s, slaveOffset expect to be int, got %s", err, split[1])
continue
}
slaveData["slave_offset"] = temp
}

var masterOffset, slaveOffset float64
var ok bool
if masterOffset, ok = masterData["master_repl_offset"]; !ok {
continue
}
if slaveOffset, ok = slaveData["slave_offset"]; !ok {
continue
}

delay := masterOffset - slaveOffset
addr := fmt.Sprintf("%s:%s", ip, port)
if addr != ":" {
// special key for slave data
if slaveMatch.MatchString(key) {
// master redis, collect slave data
slaveID, ip, port, state, offset, lag := parseConnectedSlaveString(key, value)
kvs = kvs.AddTag("slave_id", slaveID)
kvs = kvs.AddTag("slave_addr", fmt.Sprintf("%s:%s", ip, port))
kvs = kvs.AddTag("slave_state", state)
kvs = kvs.Add("slave_offset", offset, false, false)
kvs = kvs.Add("slave_lag", lag, false, false)
}

kvs = kvs.AddTag("slave_id", slaveID)

if delay >= 0 {
kvs = kvs.Add("repl_delay", delay, false, false)
}

for k, v := range masterDownSeconds {
kvs = kvs.Add(k, v, false, false)
if masterIP != "" && masterPort != "" {
kvs = kvs.AddTag("master_addr", fmt.Sprintf("%s:%s", masterIP, masterPort))
}

if kvs.FieldCount() > 0 {
Expand All @@ -189,6 +171,45 @@ func (ipt *Input) parseReplicaData(list string) ([]*point.Point, error) {
collectCache = append(collectCache, point.NewPointV2(redisReplica, kvs, opts...))
}
}

return collectCache, nil
}

var replicaFieldMap = map[string]struct{}{}

func getReplicaFieldMap() {
m := replicaMeasurement{}
for k := range m.Info().Fields {
replicaFieldMap[k] = struct{}{}
}
}

/*
slave0:ip=10.254.11.1,port=6379,state=online,offset=1751844676,lag=0
slave1:ip=10.254.11.2,port=6379,state=online,offset=1751844222,lag=0
*/
func parseConnectedSlaveString(slaveName string, keyValues string) (id string, ip string, port string, state string, offset float64, lag float64) {
slaveID := strings.TrimPrefix(slaveName, "slave")
kv := strings.SplitN(keyValues, ",", 5)
if len(kv) != 5 {
return
}
for _, v := range kv {
k := strings.Split(v, "=")
if len(k) != 2 {
continue
}
switch k[0] {
case "ip":
ip = k[1]
case "port":
port = k[1]
case "state":
state = k[1]
case "offset":
offset, _ = strconv.ParseFloat(k[1], 64)
case "lag":
lag, _ = strconv.ParseFloat(k[1], 64)
}
}
return slaveID, ip, port, state, offset, lag
}
15 changes: 10 additions & 5 deletions internal/plugins/inputs/redis/metric_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ func TestInput_parseReplicData(t *testing.T) {
list: mockReplicaData01,
},
want: []string{
"redis_replica,foo=bar,host=HOST,slave_addr=127.0.0.1:6379,slave_id=1 repl_delay=10",
"redis_replica,foo=bar,host=HOST,slave_addr=127.0.0.1:6380,slave_id=0 repl_delay=10",
"redis_replica,foo=bar,host=HOST master_repl_offset=4056",
"redis_replica,foo=bar,host=HOST,slave_addr=127.0.0.1:6379,slave_id=1,slave_state=online slave_lag=0,slave_offset=4046",
"redis_replica,foo=bar,host=HOST,slave_addr=127.0.0.1:6380,slave_id=0,slave_state=online slave_lag=0,slave_offset=4046",
},
wantErr: false,
},
Expand All @@ -62,9 +63,11 @@ func TestInput_parseReplicData(t *testing.T) {
list: mockReplicaData01,
},
want: []string{
"redis_replica,election=TRUE,foo=bar,slave_addr=127.0.0.1:6379,slave_id=1 repl_delay=10",
"redis_replica,election=TRUE,foo=bar,slave_addr=127.0.0.1:6380,slave_id=0 repl_delay=10",
"redis_replica,election=TRUE,foo=bar master_repl_offset=4056",
"redis_replica,election=TRUE,foo=bar,slave_addr=127.0.0.1:6379,slave_id=1,slave_state=online slave_lag=0,slave_offset=4046",
"redis_replica,election=TRUE,foo=bar,slave_addr=127.0.0.1:6380,slave_id=0,slave_state=online slave_lag=0,slave_offset=4046",
},

wantErr: false,
},
{
Expand All @@ -78,7 +81,9 @@ func TestInput_parseReplicData(t *testing.T) {
args: args{
list: mockReplicaData02,
},
want: []string{},
want: []string{
"redis_replica,foo=bar,host=HOST master_repl_offset=0",
},
wantErr: false,
},
}
Expand Down

0 comments on commit 0fe5dec

Please sign in to comment.