diff --git a/server/controller/recorder/cleaner.go b/server/controller/recorder/cleaner.go index 00758325538..adc2950610d 100644 --- a/server/controller/recorder/cleaner.go +++ b/server/controller/recorder/cleaner.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "golang.org/x/exp/slices" @@ -33,6 +34,7 @@ import ( "github.com/deepflowio/deepflow/server/controller/recorder/constraint" "github.com/deepflowio/deepflow/server/controller/recorder/pubsub/message" "github.com/deepflowio/deepflow/server/controller/tagrecorder" + "github.com/deepflowio/deepflow/server/libs/stats" ) var ( @@ -66,19 +68,11 @@ func (c *Cleaners) Init(ctx context.Context, cfg config.RecorderConfig) { func (c *Cleaners) Start(sContext context.Context) error { log.Info("resource clean started") - orgIDs, err := mysql.GetORGIDs() + err := c.checkORGs() if err != nil { - log.Errorf("failed to get db for org ids: %s", err.Error()) return err } - for _, id := range orgIDs { - if _, err := c.NewCleanerIfNotExists(id); err != nil { - log.Errorf("failed to cleaner for org id %d: %s", id, err.Error()) - return err - } - } - // 定时清理软删除资源数据 // timed clean soft deleted resource data c.timedCleanDeletedData(sContext) @@ -128,14 +122,29 @@ func (c *Cleaners) checkORGs() error { log.Errorf("failed to get db for org ids: %s", err.Error()) return err } + + for _, orgID := range orgIDs { + if _, err := c.NewCleanerIfNotExists(orgID); err != nil { + log.Errorf("failed to cleaner for org id %d: %s", orgID, err.Error()) + return err + } + } + c.removeOrRefresh(orgIDs) + return nil +} + +func (c *Cleaners) removeOrRefresh(orgIDs []int) { c.mux.Lock() defer c.mux.Unlock() - for orgID := range c.orgIDToCleaner { - if !slices.Contains(orgIDs, orgID) { + + for orgID, cleaner := range c.orgIDToCleaner { + if slices.Contains(orgIDs, orgID) { + cleaner.refreshStatsd() + } else { + cleaner.closeStatsd() delete(c.orgIDToCleaner, orgID) } } - return nil } func (c *Cleaners) cleanDeletedData() { @@ -215,6 +224,9 @@ func (c *Cleaners) set(orgID int, cl *Cleaner) { type Cleaner struct { org *common.ORG toolData *toolData + + statsdLock sync.Mutex + domainLcuuidToStatsd map[string]*domainStatsd } func newCleaner(orgID int) (*Cleaner, error) { @@ -223,10 +235,65 @@ func newCleaner(orgID int) (*Cleaner, error) { log.Errorf("failed to create org object: %s", err.Error()) return nil, err } - c := &Cleaner{org: org, toolData: newToolData()} + c := &Cleaner{ + org: org, + toolData: newToolData(), + domainLcuuidToStatsd: make(map[string]*domainStatsd), + } + c.refreshStatsd() return c, nil } +func (c *Cleaner) closeStatsd() { + for _, statsd := range c.domainLcuuidToStatsd { + statsd.close() + } +} + +func (c *Cleaner) refreshStatsd() { + var domains []*mysqlmodel.Domain + if err := c.org.DB.Find(&domains).Error; err != nil { + log.Errorf("failed to get domain: %s", err.Error(), c.org.LogPrefix) + return + } + + c.statsdLock.Lock() + defer c.statsdLock.Unlock() + + domainLcuuidInfo := make(map[string]struct{}) + for _, domain := range domains { + domainLcuuidInfo[domain.Lcuuid] = struct{}{} + if _, ok := c.domainLcuuidToStatsd[domain.Lcuuid]; !ok { + c.domainLcuuidToStatsd[domain.Lcuuid] = newDomainStatsd(c.org, domain.Lcuuid, domain.Name) + c.domainLcuuidToStatsd[domain.Lcuuid].start() + } + } + for domainLcuuid, statsd := range c.domainLcuuidToStatsd { + if _, ok := domainLcuuidInfo[domainLcuuid]; !ok { + statsd.close() + delete(c.domainLcuuidToStatsd, domainLcuuid) + } + } +} + +func (c *Cleaner) getStatsd(domainLcuuid string, tagType string) *cleanerCounter { + c.statsdLock.Lock() + defer c.statsdLock.Unlock() + + if statsd, ok := c.domainLcuuidToStatsd[domainLcuuid]; ok { + return statsd.get(tagType) + } + return nil +} + +func (c *Cleaner) fillStatsd(domainLcuuid string, tagType string, count int) { + if statsd := c.getStatsd(domainLcuuid, tagType); statsd != nil { + statsd.Fill(count) + } else { + log.Error("%s %s statsd not found", domainLcuuid, tagType, c.org.LogPrefix) + } +} + func (c *Cleaner) cleanDeletedData(retentionInterval int) { if err := c.toolData.load(c.org.DB); err != nil { log.Error("failed to load tool data", c.org.LogPrefix) @@ -267,37 +334,77 @@ func (c *Cleaner) cleanDirtyData() { log.Error("failed to load tool data", c.org.LogPrefix) return } - + var domains []*mysqlmodel.Domain + if err := c.org.DB.Find(&domains).Error; err != nil { + log.Errorf("failed to get domain: %s", err.Error(), c.org.LogPrefix) + return + } log.Info("clean dirty data started", c.org.LogPrefix) - c.cleanVMDirty() - c.cleanNetworkDirty() - c.cleanVRouterDirty() - c.cleanPodIngressDirty() - c.cleanPodServiceDirty() - c.cleanPodNodeDirty() - c.cleanPodGroupDirty() - c.cleanPodDirty() - c.cleanVInterfaceDirty() + for _, domain := range domains { + c.cleanHostDirty(domain.Lcuuid) + c.cleanVMDirty(domain.Lcuuid) + c.cleanNetworkDirty(domain.Lcuuid) + c.cleanVRouterDirty(domain.Lcuuid) + c.cleanPodIngressDirty(domain.Lcuuid) + c.cleanPodServiceDirty(domain.Lcuuid) + c.cleanPodNodeDirty(domain.Lcuuid) + c.cleanPodGroupDirty(domain.Lcuuid) + c.cleanPodDirty(domain.Lcuuid) + c.cleanVInterfaceDirty(domain.Lcuuid) + } log.Info("clean dirty data completed", c.org.LogPrefix) } -func (c *Cleaner) cleanVMDirty() { - vmIDs := getIDs[mysqlmodel.VM](c.org.DB) +func (c *Cleaner) cleanHostDirty(domainLcuuid string) { + deviceIDs := getIDs[mysqlmodel.Host](c.org.DB, domainLcuuid) + if len(deviceIDs) != 0 { + vifs, _ := WhereFindPtr[mysqlmodel.VInterface]( + c.org.DB, + "domain = ? AND devicetype = ? AND deviceid NOT IN ?", domainLcuuid, ctrlrcommon.VIF_DEVICE_TYPE_HOST, deviceIDs, + ) + if len(vifs) != 0 { + c.org.DB.Delete(&vifs) + log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, ctrlrcommon.RESOURCE_TYPE_HOST_EN, vifs), c.org.LogPrefix) + + c.fillStatsd(domainLcuuid, tagTypeDeviceIPConn, len(vifs)) + } + } +} + +func (c *Cleaner) cleanVMDirty(domainLcuuid string) { + vmIDs := getIDs[mysqlmodel.VM](c.org.DB, domainLcuuid) if len(vmIDs) != 0 { - var vifs []*mysqlmodel.VInterface - c.org.DB.Where("devicetype = ? AND deviceid NOT IN ?", ctrlrcommon.VIF_DEVICE_TYPE_VM, vmIDs).Find(&vifs) + vifs, _ := WhereFindPtr[mysqlmodel.VInterface]( + c.org.DB, + "domain = ? AND devicetype = ? AND deviceid NOT IN ?", domainLcuuid, ctrlrcommon.VIF_DEVICE_TYPE_VM, vmIDs, + ) if len(vifs) != 0 { c.org.DB.Delete(&vifs) log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, ctrlrcommon.RESOURCE_TYPE_VM_EN, vifs), c.org.LogPrefix) + + c.fillStatsd(domainLcuuid, tagTypeDeviceIPConn, len(vifs)) + } + + vmPodNodeConns, _ := WhereFindPtr[mysqlmodel.VMPodNodeConnection]( + c.org.DB, + "domain = ? AND vm_id NOT IN ?", domainLcuuid, vmIDs, + ) + if len(vmPodNodeConns) != 0 { + c.org.DB.Delete(&vmPodNodeConns) + log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VM_POD_NODE_CONNECTION_EN, ctrlrcommon.RESOURCE_TYPE_VM_EN, vmPodNodeConns), c.org.LogPrefix) + + c.fillStatsd(domainLcuuid, tagTypeCHostPodNodeConn, len(vmPodNodeConns)) } } } -func (c *Cleaner) cleanNetworkDirty() { - networkIDs := getIDs[mysqlmodel.Network](c.org.DB) +func (c *Cleaner) cleanNetworkDirty(domainLcuuid string) { + networkIDs := getIDs[mysqlmodel.Network](c.org.DB, domainLcuuid) if len(networkIDs) != 0 { - var subnets []*mysqlmodel.Subnet - c.org.DB.Where("vl2id NOT IN ?", networkIDs).Find(&subnets) + subnets, _ := WhereFindPtr[mysqlmodel.Subnet]( + c.org.DB, + "domain = ? AND vl2id NOT IN ?", domainLcuuid, networkIDs, + ) if len(subnets) != 0 { c.org.DB.Delete(&subnets) log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_SUBNET_EN, ctrlrcommon.RESOURCE_TYPE_NETWORK_EN, subnets), c.org.LogPrefix) @@ -305,30 +412,47 @@ func (c *Cleaner) cleanNetworkDirty() { } } -func (c *Cleaner) cleanVRouterDirty() { - vrouterIDs := getIDs[mysqlmodel.VRouter](c.org.DB) +func (c *Cleaner) cleanVRouterDirty(domainLcuuid string) { + vrouterIDs := getIDs[mysqlmodel.VRouter](c.org.DB, domainLcuuid) if len(vrouterIDs) != 0 { - var rts []*mysqlmodel.RoutingTable - c.org.DB.Where("vnet_id NOT IN ?", vrouterIDs).Find(&rts) + rts, _ := WhereFindPtr[mysqlmodel.RoutingTable]( + c.org.DB, + "domain = ? AND vnet_id NOT IN ?", domainLcuuid, vrouterIDs, + ) if len(rts) != 0 { c.org.DB.Delete(&rts) log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_ROUTING_TABLE_EN, ctrlrcommon.RESOURCE_TYPE_VROUTER_EN, rts), c.org.LogPrefix) } + + vifs, _ := WhereFindPtr[mysqlmodel.VInterface]( + c.org.DB, + "domain = ? AND devicetype = ? AND deviceid NOT IN ?", domainLcuuid, ctrlrcommon.VIF_DEVICE_TYPE_VROUTER, vrouterIDs, + ) + if len(vifs) != 0 { + c.org.DB.Delete(&vifs) + log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, ctrlrcommon.RESOURCE_TYPE_VROUTER_EN, vifs), c.org.LogPrefix) + + c.fillStatsd(domainLcuuid, tagTypeDeviceIPConn, len(vifs)) + } } } -func (c *Cleaner) cleanPodIngressDirty() { - podIngressIDs := getIDs[mysqlmodel.PodIngress](c.org.DB) +func (c *Cleaner) cleanPodIngressDirty(domainLcuuid string) { + podIngressIDs := getIDs[mysqlmodel.PodIngress](c.org.DB, domainLcuuid) if len(podIngressIDs) != 0 { - var podIngressRules []*mysqlmodel.PodIngressRule - c.org.DB.Where("pod_ingress_id NOT IN ?", podIngressIDs).Find(&podIngressRules) + podIngressRules, _ := WhereFindPtr[mysqlmodel.PodIngressRule]( + c.org.DB, + "domain = ? AND pod_ingress_id NOT IN ?", domainLcuuid, podIngressIDs, + ) if len(podIngressRules) != 0 { c.org.DB.Delete(&podIngressRules) log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_INGRESS_RULE_EN, ctrlrcommon.RESOURCE_TYPE_POD_INGRESS_EN, podIngressRules), c.org.LogPrefix) } - var podIngressRuleBkds []*mysqlmodel.PodIngressRuleBackend - c.org.DB.Where("pod_ingress_id NOT IN ?", podIngressIDs).Find(&podIngressRuleBkds) + podIngressRuleBkds, _ := WhereFindPtr[mysqlmodel.PodIngressRuleBackend]( + c.org.DB, + "domain = ? AND pod_ingress_id NOT IN ?", domainLcuuid, podIngressIDs, + ) if len(podIngressRuleBkds) != 0 { c.org.DB.Delete(&podIngressRuleBkds) log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_INGRESS_RULE_BACKEND_EN, ctrlrcommon.RESOURCE_TYPE_POD_INGRESS_EN, podIngressRuleBkds), c.org.LogPrefix) @@ -336,44 +460,56 @@ func (c *Cleaner) cleanPodIngressDirty() { } } -func (c *Cleaner) cleanPodServiceDirty() { - podServiceIDs := getIDs[mysqlmodel.PodService](c.org.DB) +func (c *Cleaner) cleanPodServiceDirty(domainLcuuid string) { + podServiceIDs := getIDs[mysqlmodel.PodService](c.org.DB, domainLcuuid) if len(podServiceIDs) != 0 { - var podServicePorts []*mysqlmodel.PodServicePort - c.org.DB.Where("pod_service_id NOT IN ?", podServiceIDs).Find(&podServicePorts) + podServicePorts, _ := WhereFindPtr[mysqlmodel.PodServicePort]( + c.org.DB, + "domain = ? AND pod_service_id NOT IN ?", domainLcuuid, podServiceIDs, + ) if len(podServicePorts) != 0 { c.org.DB.Delete(&podServicePorts) log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_PORT_EN, ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_EN, podServicePorts), c.org.LogPrefix) } - var podGroupPorts []*mysqlmodel.PodGroupPort - c.org.DB.Where("pod_service_id NOT IN ?", podServiceIDs).Find(&podGroupPorts) + podGroupPorts, _ := WhereFindPtr[mysqlmodel.PodGroupPort]( + c.org.DB, + "domain = ? AND pod_service_id NOT IN ?", domainLcuuid, podServiceIDs, + ) if len(podGroupPorts) != 0 { c.org.DB.Delete(&podGroupPorts) log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_GROUP_PORT_EN, ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_EN, podGroupPorts), c.org.LogPrefix) } - var vifs []*mysqlmodel.VInterface - c.org.DB.Where("devicetype = ? AND deviceid NOT IN ?", ctrlrcommon.VIF_DEVICE_TYPE_POD_SERVICE, podServiceIDs).Find(&vifs) + vifs, _ := WhereFindPtr[mysqlmodel.VInterface]( + c.org.DB, + "domain = ? AND devicetype = ? AND deviceid NOT IN ?", domainLcuuid, ctrlrcommon.VIF_DEVICE_TYPE_POD_SERVICE, podServiceIDs, + ) if len(vifs) != 0 { c.org.DB.Delete(&vifs) log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_EN, vifs), c.org.LogPrefix) + + c.fillStatsd(domainLcuuid, tagTypeDeviceIPConn, len(vifs)) } } } -func (c *Cleaner) cleanPodGroupDirty() { - podGroupIDs := getIDs[mysqlmodel.PodGroup](c.org.DB) +func (c *Cleaner) cleanPodGroupDirty(domainLcuuid string) { + podGroupIDs := getIDs[mysqlmodel.PodGroup](c.org.DB, domainLcuuid) if len(podGroupIDs) != 0 { - var podGroupPorts []*mysqlmodel.PodGroupPort - c.org.DB.Where("pod_group_id NOT IN ?", podGroupIDs).Find(&podGroupPorts) + podGroupPorts, _ := WhereFindPtr[mysqlmodel.PodGroupPort]( + c.org.DB, + "domain = ? AND pod_group_id NOT IN ?", domainLcuuid, podGroupIDs, + ) if len(podGroupPorts) != 0 { c.org.DB.Delete(&podGroupPorts) log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_GROUP_PORT_EN, ctrlrcommon.RESOURCE_TYPE_POD_GROUP_EN, podGroupPorts), c.org.LogPrefix) } - var pods []*mysqlmodel.Pod - c.org.DB.Where("pod_group_id NOT IN ?", podGroupIDs).Find(&pods) + pods, _ := WhereFindPtr[mysqlmodel.Pod]( + c.org.DB, + "domain = ? AND pod_group_id NOT IN ?", domainLcuuid, podGroupIDs, + ) if len(pods) != 0 { c.org.DB.Delete(&pods) publishTagrecorder(c.org.DB, pods, ctrlrcommon.RESOURCE_TYPE_POD_EN, c.toolData) @@ -382,25 +518,35 @@ func (c *Cleaner) cleanPodGroupDirty() { } } -func (c *Cleaner) cleanPodNodeDirty() { - podNodeIDs := getIDs[mysqlmodel.PodNode](c.org.DB) +func (c *Cleaner) cleanPodNodeDirty(domainLcuuid string) { + podNodeIDs := getIDs[mysqlmodel.PodNode](c.org.DB, domainLcuuid) if len(podNodeIDs) != 0 { - var vifs []*mysqlmodel.VInterface - c.org.DB.Where("devicetype = ? AND deviceid NOT IN ?", ctrlrcommon.VIF_DEVICE_TYPE_POD_NODE, podNodeIDs).Find(&vifs) + vifs, _ := WhereFindPtr[mysqlmodel.VInterface]( + c.org.DB, + "domain = ? AND devicetype = ? AND deviceid NOT IN ?", domainLcuuid, ctrlrcommon.VIF_DEVICE_TYPE_POD_NODE, podNodeIDs, + ) if len(vifs) != 0 { c.org.DB.Delete(&vifs) log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, ctrlrcommon.RESOURCE_TYPE_POD_NODE_EN, vifs), c.org.LogPrefix) + + c.fillStatsd(domainLcuuid, tagTypeDeviceIPConn, len(vifs)) } - var vmPodNodeConns []*mysqlmodel.VMPodNodeConnection - c.org.DB.Where("pod_node_id NOT IN ?", podNodeIDs).Find(&vmPodNodeConns) + vmPodNodeConns, _ := WhereFindPtr[mysqlmodel.VMPodNodeConnection]( + c.org.DB, + "domain = ? AND pod_node_id NOT IN ?", domainLcuuid, podNodeIDs, + ) if len(vmPodNodeConns) != 0 { c.org.DB.Delete(&vmPodNodeConns) log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VM_POD_NODE_CONNECTION_EN, ctrlrcommon.RESOURCE_TYPE_POD_NODE_EN, vmPodNodeConns), c.org.LogPrefix) + + c.fillStatsd(domainLcuuid, tagTypeCHostPodNodeConn, len(vmPodNodeConns)) } - var pods []*mysqlmodel.Pod - c.org.DB.Where("pod_node_id != 0 AND pod_node_id NOT IN ?", podNodeIDs).Find(&pods) + pods, _ := WhereFindPtr[mysqlmodel.Pod]( + c.org.DB, + "domain = ? AND pod_node_id != 0 AND pod_node_id NOT IN ?", domainLcuuid, podNodeIDs, + ) if len(pods) != 0 { c.org.DB.Delete(&pods) publishTagrecorder(c.org.DB, pods, ctrlrcommon.RESOURCE_TYPE_POD_EN, c.toolData) @@ -409,29 +555,38 @@ func (c *Cleaner) cleanPodNodeDirty() { } } -func (c *Cleaner) cleanPodDirty() { - podIDs := getIDs[mysqlmodel.Pod](c.org.DB) +func (c *Cleaner) cleanPodDirty(domainLcuuid string) { + podIDs := getIDs[mysqlmodel.Pod](c.org.DB, domainLcuuid) if len(podIDs) != 0 { - var vifs []*mysqlmodel.VInterface - c.org.DB.Where("devicetype = ? AND deviceid NOT IN ?", ctrlrcommon.VIF_DEVICE_TYPE_POD, podIDs).Find(&vifs) + vifs, _ := WhereFindPtr[mysqlmodel.VInterface]( + c.org.DB, + "domain = ? AND devicetype = ? AND deviceid NOT IN ?", domainLcuuid, ctrlrcommon.VIF_DEVICE_TYPE_POD, podIDs, + ) if len(vifs) != 0 { c.org.DB.Delete(&vifs) log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, ctrlrcommon.RESOURCE_TYPE_POD_EN, vifs), c.org.LogPrefix) + + c.fillStatsd(domainLcuuid, tagTypeDeviceIPConn, len(vifs)) } } } -func (c *Cleaner) cleanVInterfaceDirty() { - vifIDs := getIDs[mysqlmodel.VInterface](c.org.DB) +func (c *Cleaner) cleanVInterfaceDirty(domainLcuuid string) { + vifIDs := getIDs[mysqlmodel.VInterface](c.org.DB, domainLcuuid) if len(vifIDs) != 0 { - var lanIPs []*mysqlmodel.LANIP - c.org.DB.Where("vifid NOT IN ?", vifIDs).Find(&lanIPs) + lanIPs, _ := WhereFindPtr[mysqlmodel.LANIP]( + c.org.DB, + "domain = ? AND vifid NOT IN ?", domainLcuuid, vifIDs, + ) if len(lanIPs) != 0 { c.org.DB.Delete(&lanIPs) log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_LAN_IP_EN, ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, lanIPs), c.org.LogPrefix) } - var wanIPs []*mysqlmodel.WANIP - c.org.DB.Where("vifid NOT IN ?", vifIDs).Find(&wanIPs) + + wanIPs, _ := WhereFindPtr[mysqlmodel.WANIP]( + c.org.DB, + "domain = ? AND vifid NOT IN ?", domainLcuuid, vifIDs, + ) if len(wanIPs) != 0 { c.org.DB.Delete(&wanIPs) log.Error(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_WAN_IP_EN, ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, wanIPs), c.org.LogPrefix) @@ -439,6 +594,104 @@ func (c *Cleaner) cleanVInterfaceDirty() { } } +const ( + tagTypeDeviceIPConn = "device_ip_connection" + tagTypeCHostPodNodeConn = "chost_pod_node_connection" +) + +type domainStatsd struct { + org *common.ORG + lcuuid string + name string + + deviceIPConn *cleanerCounter + chostPodNodeConn *cleanerCounter +} + +func newDomainStatsd(org *common.ORG, domainLcuuid, domainName string) *domainStatsd { + return &domainStatsd{ + org: org, + lcuuid: domainLcuuid, + name: domainName, + + deviceIPConn: newCleanerCounter(), + chostPodNodeConn: newCleanerCounter(), + } +} + +func (d *domainStatsd) close() { + log.Info("close cleaner statsd of domain (lcuuid: %s)", d.lcuuid, d.org.LogPrefix) + d.deviceIPConn.Closed() + d.chostPodNodeConn.Closed() +} + +func (d *domainStatsd) get(tagType string) *cleanerCounter { + switch tagType { + case tagTypeDeviceIPConn: + return d.deviceIPConn + case tagTypeCHostPodNodeConn: + return d.chostPodNodeConn + } + return nil +} + +func (d *domainStatsd) start() { + log.Infof("start cleaner statsd of domain (lcuuid: %s)", d.lcuuid, d.org.LogPrefix) + err := stats.RegisterCountableWithModulePrefix( + "controller_", + "resource_relation_exception", + d.deviceIPConn, + stats.OptionStatTags{"org_id": fmt.Sprintf("%d", d.org.ID), "domain": d.name, "type": tagTypeDeviceIPConn}, + ) + if err != nil { + log.Errorf("failed to register cleaner statsd of domain (lcuuid: %s) device_ip_connection: %s", d.lcuuid, err.Error(), d.org.LogPrefix) + } + + err = stats.RegisterCountableWithModulePrefix( + "controller_", + "resource_relation_exception", + d.chostPodNodeConn, + stats.OptionStatTags{"org_id": fmt.Sprintf("%d", d.org.ID), "domain": d.name, "type": tagTypeCHostPodNodeConn}, + ) + if err != nil { + log.Errorf("failed to register cleaner statsd of domain (lcuuid: %s) chost_pod_node_connection: %s", d.lcuuid, err.Error(), d.org.LogPrefix) + } +} + +type tmpCounter struct { + count uint64 `statsd:"count"` +} + +func (c *tmpCounter) Fill(count int) { + atomic.AddUint64(&c.count, uint64(count)) +} + +type cleanerCounter struct { + *tmpCounter +} + +func newCleanerCounter() *cleanerCounter { + return &cleanerCounter{ + tmpCounter: &tmpCounter{}, + } +} + +func (c *cleanerCounter) GetCounter() interface{} { + counter := &tmpCounter{} + counter, c.tmpCounter = c.tmpCounter, counter + return counter +} + +func (c *cleanerCounter) Closed() bool { + return false +} + +func WhereFindPtr[T any](db *mysql.DB, query interface{}, args ...interface{}) ([]*T, error) { + var result []*T + err := db.Where(query, args...).Find(&result).Error + return result, err +} + func formatLogDeleteABecauseBHasGone[MT constraint.MySQLModel](a, b string, items []*MT) string { var str string for _, item := range items { @@ -464,9 +717,9 @@ func deleteExpired[MT constraint.MySQLSoftDeleteModel](db *mysql.DB, expiredAt t return dbItems } -func getIDs[MT constraint.MySQLModel](db *mysql.DB) (ids []int) { +func getIDs[MT constraint.MySQLModel](db *mysql.DB, domainLcuuid string) (ids []int) { var dbItems []*MT - db.Select("id").Find(&dbItems) + db.Where("domain = ?", domainLcuuid).Select("id").Find(&dbItems) for _, item := range dbItems { ids = append(ids, (*item).GetID()) }