diff --git a/pixiu/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go b/pixiu/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go index 93d874486..66cdb22c1 100644 --- a/pixiu/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go +++ b/pixiu/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go @@ -18,7 +18,6 @@ package zookeeper import ( - "strings" "sync" "time" ) @@ -33,7 +32,6 @@ import ( common2 "github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/dubboregistry/common" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/dubboregistry/registry" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/dubboregistry/remoting/zookeeper" - "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger" ) @@ -128,16 +126,15 @@ func (zkl *serviceListener) waitEventAndHandlePeriod(children []string, e <-chan // whenever it is called, the children node changed and refresh the api configuration. func (zkl *serviceListener) handleEvent() { + // get all children of provider, such as /dubbo-app/org.apache.dubbo.samples.api.DemoService/providers children, err := zkl.client.GetChildren(zkl.path) if err != nil { - // disable the API - bkConf, methods, _, _ := registry.ParseDubboString(zkl.url.String()) + // disable the service all methods + bkConf, _, _, _ := registry.ParseDubboString(zkl.url.String()) apiPattern := registry.GetAPIPattern(bkConf) - for i := range methods { - path := strings.Join([]string{apiPattern, methods[i]}, constant.PathSlash) - if err := zkl.adapterListener.OnDeleteRouter(config.Resource{Path: path}); err != nil { - logger.Errorf("Error={%s} when try to remove API by path: %s", err.Error(), path) - } + // delete all config of an interface, such as /dubbo-app/org.apache.dubbo.samples.api.DemoService + if err := zkl.adapterListener.OnDeleteRouter(config.Resource{Path: apiPattern}); err != nil { + logger.Errorf("Error={%s} when try to remove API by path: %s", err.Error(), apiPattern) } return } diff --git a/pixiu/pkg/adapter/dubboregistry/registrycenter.go b/pixiu/pkg/adapter/dubboregistry/registrycenter.go index 7b6472749..724214c53 100644 --- a/pixiu/pkg/adapter/dubboregistry/registrycenter.go +++ b/pixiu/pkg/adapter/dubboregistry/registrycenter.go @@ -19,11 +19,8 @@ package dubboregistry import ( "os" -) - -import ( - "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config" - "github.com/dubbo-go-pixiu/pixiu-api/pkg/router" + "strconv" + "strings" ) import ( @@ -35,6 +32,9 @@ import ( "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/model" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/server" + + "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config" + "github.com/dubbo-go-pixiu/pixiu-api/pkg/router" ) func init() { @@ -65,19 +65,19 @@ func (p Plugin) Kind() string { func (p *Plugin) CreateAdapter(a *model.Adapter) (adapter.Adapter, error) { adapter := &Adapter{id: a.ID, registries: make(map[string]registry.Registry), - cfg: AdaptorConfig{Registries: make(map[string]model.Registry)}} + cfg: &AdaptorConfig{Registries: make(map[string]model.Registry)}} return adapter, nil } // Adapter to monitor dubbo services on registry center type Adapter struct { id string - cfg AdaptorConfig + cfg *AdaptorConfig registries map[string]registry.Registry } // Start starts the adaptor -func (a Adapter) Start() { +func (a *Adapter) Start() { for _, reg := range a.registries { if err := reg.Subscribe(); err != nil { logger.Errorf("Subscribe fail, error is {%s}", err.Error()) @@ -113,21 +113,41 @@ func (a *Adapter) Apply() error { } // Config returns the config of the adaptor -func (a Adapter) Config() interface{} { +func (a *Adapter) Config() interface{} { return a.cfg } func (a *Adapter) OnAddAPI(r router.API) error { - acm := server.GetApiConfigManager() - return acm.AddAPI(a.id, r) + ipPort := strings.Split(r.IntegrationRequest.URL, ":") + port, err := strconv.Atoi(ipPort[1]) + if err != nil { + return err + } + cluster := strings.Join([]string{r.ApplicationName, r.Interface, r.Version, r.Group}, constant.PathSlash) + server.GetClusterManager().SetEndpoint(cluster, &model.Endpoint{ + ID: r.IntegrationRequest.URL, + Address: model.SocketAddress{ + Address: ipPort[0], + Port: port, + }}, + ) + prefix := strings.Join([]string{"/" + r.ApplicationName, r.Interface}, constant.PathSlash) + match := model.RouterMatch{Prefix: prefix, Methods: []string{string(r.HTTPVerb)}} + route := model.RouteAction{Cluster: cluster} + added := &model.Router{ID: r.URLPattern, Match: match, Route: route} + server.GetRouterManager().AddRouter(added) + return server.GetApiConfigManager().AddAPI(a.id, r) } func (a *Adapter) OnRemoveAPI(r router.API) error { - acm := server.GetApiConfigManager() - return acm.RemoveAPI(a.id, r) + cluster := strings.Join([]string{r.ApplicationName, r.Interface, r.Version, r.Group}, constant.PathSlash) + server.GetClusterManager().DeleteEndpoint(cluster, r.IntegrationRequest.URL) + return server.GetApiConfigManager().RemoveAPI(a.id, r) } func (a *Adapter) OnDeleteRouter(r config.Resource) error { - acm := server.GetApiConfigManager() - return acm.DeleteRouter(a.id, r) + empty := &model.ClusterConfig{Name: r.Path, LbStr: model.LoadBalancerRoundRobin, Endpoints: []*model.Endpoint{}} + server.GetClusterManager().UpdateCluster(empty) + server.GetRouterManager().DeleteRouter(&model.Router{Match: model.RouterMatch{Prefix: r.Path}}) + return server.GetApiConfigManager().DeleteRouter(a.id, r) } diff --git a/pixiu/pkg/model/router.go b/pixiu/pkg/model/router.go index ce41669de..a33dfa867 100644 --- a/pixiu/pkg/model/router.go +++ b/pixiu/pkg/model/router.go @@ -20,6 +20,7 @@ package model import ( stdHttp "net/http" "regexp" + "strings" ) import ( @@ -141,3 +142,15 @@ func (hm *HeaderMatcher) SetValueRegex(regex string) error { hm.Regex = false return err } + +func (r *Router) String() string { + var builder strings.Builder + builder.WriteString("[" + strings.Join(r.Match.Methods, ",") + "] ") + if r.Match.Prefix != "" { + builder.WriteString("prefix " + r.Match.Prefix) + } else { + builder.WriteString("path " + r.Match.Path) + } + builder.WriteString(" to cluster " + r.Route.Cluster) + return builder.String() +} diff --git a/pixiu/pkg/server/api_config_manager.go b/pixiu/pkg/server/api_config_manager.go index 616e555fb..b656b6fda 100644 --- a/pixiu/pkg/server/api_config_manager.go +++ b/pixiu/pkg/server/api_config_manager.go @@ -58,10 +58,10 @@ func (acm *ApiConfigManager) AddApiConfigListener(adapterID string, l ApiConfigL func (acm *ApiConfigManager) AddAPI(adapterID string, r router.API) error { l, existed := acm.als[adapterID] - if !existed { - return errors.Errorf("no listener found") + if existed { + return l.OnAddAPI(r) } - return l.OnAddAPI(r) + return nil } func (acm *ApiConfigManager) RemoveAPI(adapterID string, r router.API) error { diff --git a/pixiu/pkg/server/router_manager.go b/pixiu/pkg/server/router_manager.go index 1b1d4f734..65d7444be 100644 --- a/pixiu/pkg/server/router_manager.go +++ b/pixiu/pkg/server/router_manager.go @@ -18,6 +18,7 @@ package server import ( + "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pixiu/pkg/model" ) @@ -42,12 +43,14 @@ func (rm *RouterManager) AddRouterListener(l RouterListener) { } func (rm *RouterManager) AddRouter(r *model.Router) { + logger.Infof("add router: %v", r) for _, l := range rm.rls { l.OnAddRouter(r) } } func (rm *RouterManager) DeleteRouter(r *model.Router) { + logger.Infof("del router: %v", r) for _, l := range rm.rls { l.OnDeleteRouter(r) }