Skip to content

Commit

Permalink
Lustre listener
Browse files Browse the repository at this point in the history
  • Loading branch information
dimm0 committed Apr 20, 2021
1 parent 2a8d21f commit 3e60639
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 32 deletions.
62 changes: 37 additions & 25 deletions lustre_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package main
import (
"bufio"
"fmt"
"github.com/patrickmn/go-cache"
"io/ioutil"
"log"
"os/exec"
"path/filepath"
"strings"
"time"

"github.com/patrickmn/go-cache"
"github.com/spf13/viper"
)

type LustreEvent struct {
Expand All @@ -22,7 +24,7 @@ type LustreEvent struct {
var c = cache.New(5*time.Minute, 10*time.Minute)

func listenLog() {
eventsChan := make(chan string, 100000)
eventsChan := make(chan string, viper.GetInt("listen_queue"))

go func() {
for range time.NewTicker(5 * time.Second).C {
Expand Down Expand Up @@ -81,53 +83,63 @@ func listenLog() {
}(mdt, user)
}

for i := 0; i < 2; i++ {
for j := 0; j < viper.GetInt("listen_workers"); j++ {
go func() {
for evtStr := range eventsChan {
evtTokens := strings.Split(evtStr, " ")
switch evtTokens[1] {
case "01CREAT":
group, user, err := getOwner(evtTokens[5][3 : len(evtTokens[5])-1])
fstype, group, user, err := getOwner(evtTokens[5][3 : len(evtTokens[5])-1])
if err != nil {
logger.Errorf("Error getting fid of created file: %s", err.Error())
}
LLFilesCreatedCounter.WithLabelValues(group, user, *fsListenParam).Inc()
LLFilesCreatedCounter.WithLabelValues(fstype, group, user, *fsListenParam).Inc()
case "02MKDIR":
group, user, err := getOwner(evtTokens[5][3 : len(evtTokens[5])-1])
fstype, group, user, err := getOwner(evtTokens[5][3 : len(evtTokens[5])-1])
if err != nil {
logger.Errorf("Error getting fid of created folder: %s", err.Error())
}
LLFoldersCreatedCounter.WithLabelValues(group, user, *fsListenParam).Inc()
LLFoldersCreatedCounter.WithLabelValues(fstype, group, user, *fsListenParam).Inc()
case "07RMDIR":
group, user, err := getOwner(evtTokens[5][3 : len(evtTokens[5])-1])
fstype, group, user, err := getOwner(evtTokens[5][3 : len(evtTokens[5])-1])
if err != nil {
logger.Errorf("Error getting fid of deleted folder: %s", err.Error())
}
LLFoldersRemovedCounter.WithLabelValues(group, user, *fsListenParam).Inc()
LLFoldersRemovedCounter.WithLabelValues(fstype, group, user, *fsListenParam).Inc()
case "06UNLNK":
group, user, err := getOwner(evtTokens[5][3 : len(evtTokens[5])-1])
fstype, group, user, err := getOwner(evtTokens[5][3 : len(evtTokens[5])-1])
if err != nil {
logger.Errorf("Error getting fid of deleted file: %s", err.Error())
}
LLFilesRemovedCounter.WithLabelValues(group, user, *fsListenParam).Inc()
LLFilesRemovedCounter.WithLabelValues(fstype, group, user, *fsListenParam).Inc()
case "10OPEN":
if len(evtTokens) < 5 || len(evtTokens[6]) < 5 {
logger.Errorf("Got unusual open event: |%s| %d %d %s", evtTokens, len(evtTokens), len(evtTokens[6]), evtTokens[6])
} else {
fstype, group, user, err := getOwner(evtTokens[6][3 : len(evtTokens[6])-1])
if err != nil {
logger.Errorf("Error getting fid of opened file: %s", err.Error())
}
LLFilesOpenedCounter.WithLabelValues(fstype, group, user, *fsListenParam).Inc()
}
case "14SATTR":
group, user, err := getOwner(evtTokens[5][3 : len(evtTokens[5])-1])
fstype, group, user, err := getOwner(evtTokens[5][3 : len(evtTokens[5])-1])
if err != nil {
logger.Errorf("Error getting fid of changed attr %s: %s", evtTokens[5][3 : len(evtTokens[5])-1], err.Error())
logger.Errorf("Error getting fid of changed attr %s: %s", evtTokens[5][3:len(evtTokens[5])-1], err.Error())
}
LLAttrChangedCounter.WithLabelValues(group, user, *fsListenParam).Inc()
LLAttrChangedCounter.WithLabelValues(fstype, group, user, *fsListenParam).Inc()
case "15XATTR":
group, user, err := getOwner(evtTokens[5][3 : len(evtTokens[5])-1])
fstype, group, user, err := getOwner(evtTokens[5][3 : len(evtTokens[5])-1])
if err != nil {
logger.Errorf("Error getting fid of changed attr %s: %s", evtTokens[5][3 : len(evtTokens[5])-1], err.Error())
logger.Errorf("Error getting fid of changed attr %s: %s", evtTokens[5][3:len(evtTokens[5])-1], err.Error())
}
LLXAttrChangedCounter.WithLabelValues(group, user, *fsListenParam).Inc()
LLXAttrChangedCounter.WithLabelValues(fstype, group, user, *fsListenParam).Inc()
case "17MTIME":
group, user, err := getOwner(evtTokens[5][3 : len(evtTokens[5])-1])
fstype, group, user, err := getOwner(evtTokens[5][3 : len(evtTokens[5])-1])
if err != nil {
logger.Errorf("Error getting fid of mtime %s: %s", evtTokens[5][3:len(evtTokens[5])-1], err.Error())
}
LLMtimeChangedCounter.WithLabelValues(group, user, *fsListenParam).Inc()
LLMtimeChangedCounter.WithLabelValues(fstype, group, user, *fsListenParam).Inc()
default:
logger.Errorf("Got unknown event: %s", evtTokens)
}
Expand All @@ -137,7 +149,7 @@ func listenLog() {

}

func getOwner(fid string) (string, string, error) {
func getOwner(fid string) (fstype string, project string, user string, err error) { // works for combined /projects and /scratch

var rel string

Expand All @@ -150,22 +162,22 @@ func getOwner(fid string) (string, string, error) {

pathB, err := exec.Command("lfs", "fid2path", fs.GetMountPath(), fid).Output()
if err != nil {
return "unknown", "unknown", err
return "", "unknown", "unknown", err
}

path := string(pathB)
rel, err = filepath.Rel(fs.GetMountPath(), path)
if err != nil {
return "", "", err
return "", "unknown", "unknown", err
}
c.Set(fid, rel, cache.DefaultExpiration)
}

if len(strings.Split(rel, "/")) < 2 {
return "", "", fmt.Errorf("Path %s does not contain group and user", rel)
if len(strings.Split(rel, "/")) < 3 {
return "", "", "", fmt.Errorf("Path %s does not contain group and user", rel)
}

splitPath := strings.Split(rel, "/")
return splitPath[0], splitPath[1], nil
return splitPath[0], splitPath[1], splitPath[2], nil

}
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func readWorkerConfig() {
viper.SetDefault("monitor_mount_sec", 2)
viper.SetDefault("elastic_index", "idx")

viper.SetDefault("listen_workers", 4)
viper.SetDefault("listen_queue", 100000)

viper.SetDefault("copy", "true")
viper.SetDefault("scan", "true")

Expand Down Expand Up @@ -893,6 +896,7 @@ func main() {
listenLog()
prometheus.MustRegister(LLFilesCreatedCounter)
prometheus.MustRegister(LLFilesRemovedCounter)
prometheus.MustRegister(LLFilesOpenedCounter)
prometheus.MustRegister(LLFoldersCreatedCounter)
prometheus.MustRegister(LLFoldersRemovedCounter)
prometheus.MustRegister(LLAttrChangedCounter)
Expand Down
19 changes: 12 additions & 7 deletions prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,37 +68,42 @@ var (
Name: "ll_files_created_number",
Help: "Number of files created",
},
[]string{"group", "user", "datasource"})
[]string{"fstype", "group", "user", "datasource"})
LLFilesRemovedCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ll_files_removed_number",
Help: "Number of files removed",
},
[]string{"group", "user", "datasource"})
[]string{"fstype", "group", "user", "datasource"})
LLFilesOpenedCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ll_files_opened_number",
Help: "Number of files opened",
},
[]string{"fstype", "group", "user", "datasource"})
LLFoldersCreatedCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ll_folders_created_number",
Help: "Number of folders created",
},
[]string{"group", "user", "datasource"})
[]string{"fstype", "group", "user", "datasource"})
LLFoldersRemovedCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ll_folders_removed_number",
Help: "Number of folders removed",
},
[]string{"group", "user", "datasource"})
[]string{"fstype", "group", "user", "datasource"})
LLAttrChangedCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ll_attr_changed_number",
Help: "Number of files with attributes changed",
},
[]string{"group", "user", "datasource"})
[]string{"fstype", "group", "user", "datasource"})
LLXAttrChangedCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ll_xattr_changed_number",
Help: "Number of files with x attributes changed",
},
[]string{"group", "user", "datasource"})
[]string{"fstype", "group", "user", "datasource"})
LLMtimeChangedCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ll_mtime_changed_number",
Help: "Number of files with mtime changed",
},
[]string{"group", "user", "datasource"})
[]string{"fstype", "group", "user", "datasource"})
LLCacheHitsCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "ll_cache_hits_number",
Help: "Number of fid cache hits",
Expand Down

0 comments on commit 3e60639

Please sign in to comment.