Skip to content

Commit

Permalink
dynamic Router and Cluster from registry center
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangwenfeng committed Feb 17, 2024
1 parent be3c95e commit e146718
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package zookeeper

import (
"strings"
"sync"
"time"
)
Expand All @@ -33,7 +32,6 @@ import (
common2 "github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/dubboregistry/common"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/dubboregistry/registry"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
)

Expand Down Expand Up @@ -128,16 +126,15 @@ func (zkl *serviceListener) waitEventAndHandlePeriod(children []string, e <-chan

// whenever it is called, the children node changed and refresh the api configuration.
func (zkl *serviceListener) handleEvent() {
// get all children of provider, such as /dubbo-app/org.apache.dubbo.samples.api.DemoService/providers
children, err := zkl.client.GetChildren(zkl.path)
if err != nil {
// disable the API
bkConf, methods, _, _ := registry.ParseDubboString(zkl.url.String())
// disable the service all methods
bkConf, _, _, _ := registry.ParseDubboString(zkl.url.String())
apiPattern := registry.GetAPIPattern(bkConf)
for i := range methods {
path := strings.Join([]string{apiPattern, methods[i]}, constant.PathSlash)
if err := zkl.adapterListener.OnDeleteRouter(config.Resource{Path: path}); err != nil {
logger.Errorf("Error={%s} when try to remove API by path: %s", err.Error(), path)
}
// delete all config of an interface, such as /dubbo-app/org.apache.dubbo.samples.api.DemoService
if err := zkl.adapterListener.OnDeleteRouter(config.Resource{Path: apiPattern}); err != nil {
logger.Errorf("Error={%s} when try to remove API by path: %s", err.Error(), apiPattern)
}
return
}
Expand Down
50 changes: 35 additions & 15 deletions pixiu/pkg/adapter/dubboregistry/registrycenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ package dubboregistry

import (
"os"
)

import (
"github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"
"github.com/dubbo-go-pixiu/pixiu-api/pkg/router"
"strconv"
"strings"
)

import (
Expand All @@ -35,6 +32,9 @@ import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/server"

"github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"
"github.com/dubbo-go-pixiu/pixiu-api/pkg/router"
)

func init() {
Expand Down Expand Up @@ -65,19 +65,19 @@ func (p Plugin) Kind() string {
func (p *Plugin) CreateAdapter(a *model.Adapter) (adapter.Adapter, error) {
adapter := &Adapter{id: a.ID,
registries: make(map[string]registry.Registry),
cfg: AdaptorConfig{Registries: make(map[string]model.Registry)}}
cfg: &AdaptorConfig{Registries: make(map[string]model.Registry)}}
return adapter, nil
}

// Adapter to monitor dubbo services on registry center
type Adapter struct {
id string
cfg AdaptorConfig
cfg *AdaptorConfig
registries map[string]registry.Registry
}

// Start starts the adaptor
func (a Adapter) Start() {
func (a *Adapter) Start() {
for _, reg := range a.registries {
if err := reg.Subscribe(); err != nil {
logger.Errorf("Subscribe fail, error is {%s}", err.Error())
Expand Down Expand Up @@ -113,21 +113,41 @@ func (a *Adapter) Apply() error {
}

// Config returns the config of the adaptor
func (a Adapter) Config() interface{} {
func (a *Adapter) Config() interface{} {
return a.cfg
}

func (a *Adapter) OnAddAPI(r router.API) error {
acm := server.GetApiConfigManager()
return acm.AddAPI(a.id, r)
ipPort := strings.Split(r.IntegrationRequest.URL, ":")
port, err := strconv.Atoi(ipPort[1])
if err != nil {
return err
}
cluster := strings.Join([]string{r.ApplicationName, r.Interface, r.Version, r.Group}, constant.PathSlash)
server.GetClusterManager().SetEndpoint(cluster, &model.Endpoint{
ID: r.IntegrationRequest.URL,
Address: model.SocketAddress{
Address: ipPort[0],
Port: port,
}},
)
prefix := strings.Join([]string{"/" + r.ApplicationName, r.Interface}, constant.PathSlash)
match := model.RouterMatch{Prefix: prefix, Methods: []string{string(r.HTTPVerb)}}
route := model.RouteAction{Cluster: cluster}
added := &model.Router{ID: r.URLPattern, Match: match, Route: route}
server.GetRouterManager().AddRouter(added)
return server.GetApiConfigManager().AddAPI(a.id, r)
}

func (a *Adapter) OnRemoveAPI(r router.API) error {
acm := server.GetApiConfigManager()
return acm.RemoveAPI(a.id, r)
cluster := strings.Join([]string{r.ApplicationName, r.Interface, r.Version, r.Group}, constant.PathSlash)
server.GetClusterManager().DeleteEndpoint(cluster, r.IntegrationRequest.URL)
return server.GetApiConfigManager().RemoveAPI(a.id, r)
}

func (a *Adapter) OnDeleteRouter(r config.Resource) error {
acm := server.GetApiConfigManager()
return acm.DeleteRouter(a.id, r)
empty := &model.ClusterConfig{Name: r.Path, LbStr: model.LoadBalancerRoundRobin, Endpoints: []*model.Endpoint{}}
server.GetClusterManager().UpdateCluster(empty)
server.GetRouterManager().DeleteRouter(&model.Router{Match: model.RouterMatch{Prefix: r.Path}})
return server.GetApiConfigManager().DeleteRouter(a.id, r)
}
13 changes: 13 additions & 0 deletions pixiu/pkg/model/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package model
import (
stdHttp "net/http"
"regexp"
"strings"
)

import (
Expand Down Expand Up @@ -141,3 +142,15 @@ func (hm *HeaderMatcher) SetValueRegex(regex string) error {
hm.Regex = false
return err
}

func (r *Router) String() string {
var builder strings.Builder
builder.WriteString("[" + strings.Join(r.Match.Methods, ",") + "] ")
if r.Match.Prefix != "" {
builder.WriteString("prefix " + r.Match.Prefix)
} else {
builder.WriteString("path " + r.Match.Path)
}
builder.WriteString(" to cluster " + r.Route.Cluster)
return builder.String()
}
6 changes: 3 additions & 3 deletions pixiu/pkg/server/api_config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func (acm *ApiConfigManager) AddApiConfigListener(adapterID string, l ApiConfigL

func (acm *ApiConfigManager) AddAPI(adapterID string, r router.API) error {
l, existed := acm.als[adapterID]
if !existed {
return errors.Errorf("no listener found")
if existed {
return l.OnAddAPI(r)
}
return l.OnAddAPI(r)
return nil
}

func (acm *ApiConfigManager) RemoveAPI(adapterID string, r router.API) error {
Expand Down
3 changes: 3 additions & 0 deletions pixiu/pkg/server/router_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package server

import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
)

Expand All @@ -42,12 +43,14 @@ func (rm *RouterManager) AddRouterListener(l RouterListener) {
}

func (rm *RouterManager) AddRouter(r *model.Router) {
logger.Infof("add router: %v", r)
for _, l := range rm.rls {
l.OnAddRouter(r)
}
}

func (rm *RouterManager) DeleteRouter(r *model.Router) {
logger.Infof("del router: %v", r)
for _, l := range rm.rls {
l.OnDeleteRouter(r)
}
Expand Down

0 comments on commit e146718

Please sign in to comment.