From cc9adbb551ec56f48c9e7d10e27154e7b99869d9 Mon Sep 17 00:00:00 2001 From: wzshiming Date: Thu, 3 Dec 2020 19:01:20 +0800 Subject: [PATCH] Fix --- Dockerfile | 2 +- cmd/pipe-xds/main.go | 2 -- config/config.go | 51 +++++++++++++++++++++++++++---------- go.mod | 2 +- go.sum | 4 +-- internal/cmd/proxy/proxy.go | 15 ++++++----- internal/cmd/wait/wait.go | 6 ++--- 7 files changed, 52 insertions(+), 30 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8e865d6..d41580a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,7 +5,7 @@ ENV CGO_ENABLED=0 RUN go install ./cmd/pipe-xds FROM alpine -COPY --from=pipeproxy/pipe:v0.4.14 /usr/local/bin/pipe /usr/local/bin/ +COPY --from=pipeproxy/pipe:v0.4.15 /usr/local/bin/pipe /usr/local/bin/ COPY --from=builder /go/bin/pipe-xds /usr/local/bin/ RUN apk add -U --no-cache curl iptables ip6tables WORKDIR /etc/istio/proxy diff --git a/cmd/pipe-xds/main.go b/cmd/pipe-xds/main.go index 354ebd7..780148b 100644 --- a/cmd/pipe-xds/main.go +++ b/cmd/pipe-xds/main.go @@ -11,7 +11,6 @@ import ( cleaniptables "istio.io/istio/tools/istio-clean-iptables/pkg/cmd" iptables "istio.io/istio/tools/istio-iptables/pkg/cmd" "istio.io/pkg/log" - "istio.io/pkg/version" ) var ( @@ -36,7 +35,6 @@ func init() { cmd.AddFlags(rootCmd) rootCmd.AddCommand(proxy.GetCommand()) - rootCmd.AddCommand(version.CobraCommand()) rootCmd.AddCommand(iptables.GetCommand()) rootCmd.AddCommand(cleaniptables.GetCommand()) rootCmd.AddCommand(wait.GetCommand()) diff --git a/config/config.go b/config/config.go index 89b516b..9f67276 100644 --- a/config/config.go +++ b/config/config.go @@ -11,6 +11,7 @@ import ( "path/filepath" "reflect" "sort" + "sync" "syscall" "time" @@ -39,6 +40,7 @@ type ConfigCtx struct { xds map[string]proto.Message updateCh chan struct{} interval time.Duration + mux sync.Mutex } func NewConfigCtx(ctx context.Context, cli *adsc.ADSC, basePath string, interval time.Duration) *ConfigCtx { @@ -74,36 +76,48 @@ func (c *ConfigCtx) SetNodeID(nodeid string) { } func (c *ConfigCtx) RegisterCDS(name string, dialer bind.StreamDialer, msg proto.Message) { + c.mux.Lock() + defer c.mux.Unlock() c.cds[name] = dialer c.xds[name] = msg c.update() } func (c *ConfigCtx) RegisterEDS(name string, dialer bind.StreamDialer, msg proto.Message) { + c.mux.Lock() + defer c.mux.Unlock() c.eds[name] = dialer c.xds[name] = msg c.update() } func (c *ConfigCtx) RegisterLDS(name string, service bind.Service, msg proto.Message) { + c.mux.Lock() + defer c.mux.Unlock() c.lds[name] = service c.xds[name] = msg c.update() } func (c *ConfigCtx) RegisterRDS(name string, handler bind.HTTPHandler, msg proto.Message) { + c.mux.Lock() + defer c.mux.Unlock() c.rds[name] = handler c.xds[name] = msg c.update() } func (c *ConfigCtx) RegisterSDS(name string, tls bind.TLS, msg proto.Message) { + c.mux.Lock() + defer c.mux.Unlock() c.sds[name] = tls c.xds[name] = msg c.update() } func (c *ConfigCtx) save() { + c.mux.Lock() + defer c.mux.Unlock() componentSortd := []sortd{} serviceSortd := []sortd{} @@ -226,6 +240,9 @@ func (c *ConfigCtx) startPipe(ctx context.Context) error { c.save() cmd.Process.Signal(syscall.SIGHUP) continue loop + case <-ctx.Done(): + cmd.Process.Signal(syscall.SIGQUIT) + break loop } } case <-ctx.Done(): @@ -275,25 +292,31 @@ var comm = []byte{'\n', '#', ' '} var defaultServices = []bind.DefServiceConfig{ { Name: "_health", - Def: bind.StreamServiceConfig{ - Listener: bind.ListenerStreamListenConfigConfig{ - Network: bind.ListenerStreamListenConfigListenerNetworkEnumEnumTCP, - Address: ":15021", - }, - Handler: bind.HTTP1StreamHandlerConfig{ - Handler: BuildHealthWithHTTPHandler(), + Def: bind.TagsServiceConfig{ + Tag: "health", + Service: bind.StreamServiceConfig{ + Listener: bind.ListenerStreamListenConfigConfig{ + Network: bind.ListenerStreamListenConfigListenerNetworkEnumEnumTCP, + Address: ":15021", + }, + Handler: bind.HTTP1StreamHandlerConfig{ + Handler: BuildHealthWithHTTPHandler(), + }, }, }, }, { Name: "_metric", - Def: bind.StreamServiceConfig{ - Listener: bind.ListenerStreamListenConfigConfig{ - Network: bind.ListenerStreamListenConfigListenerNetworkEnumEnumTCP, - Address: ":15090", - }, - Handler: bind.HTTP1StreamHandlerConfig{ - Handler: BuildPrometheusWithHTTPHandler(), + Def: bind.TagsServiceConfig{ + Tag: "metric", + Service: bind.StreamServiceConfig{ + Listener: bind.ListenerStreamListenConfigConfig{ + Network: bind.ListenerStreamListenConfigListenerNetworkEnumEnumTCP, + Address: ":15090", + }, + Handler: bind.HTTP1StreamHandlerConfig{ + Handler: BuildPrometheusWithHTTPHandler(), + }, }, }, }, diff --git a/go.mod b/go.mod index 462ba2a..2a987df 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ replace ( require ( github.com/envoyproxy/go-control-plane v0.9.8-0.20201019204000-12785f608982 github.com/golang/protobuf v1.4.3 - github.com/pipeproxy/pipe v0.4.14 + github.com/pipeproxy/pipe v0.4.15 github.com/spf13/cobra v1.1.1 github.com/wzshiming/gotype v0.6.3 github.com/wzshiming/notify v0.0.5 diff --git a/go.sum b/go.sum index d8b1416..0208134 100644 --- a/go.sum +++ b/go.sum @@ -799,8 +799,8 @@ github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v3 v3.3.2 h1:QTUOCbMNDbK4PYtkuHyOBd28C0UhPBw3T4OH4WpFDik= github.com/pierrec/lz4/v3 v3.3.2/go.mod h1:280XNCGS8jAcG++AHdd6SeWnzyJ1w9oow2vbORyey8Q= -github.com/pipeproxy/pipe v0.4.14 h1:0WngAXAiLi8OrefkfzXMAE44nfWpgtfxnkCwj7VRLOA= -github.com/pipeproxy/pipe v0.4.14/go.mod h1:fVvEupWbJ2ihcyWk/iSLYtBz/9wlZnshG8dWTCnlQ8k= +github.com/pipeproxy/pipe v0.4.15 h1:/pp61UdVIe5GgVHaY1jJT6edrnrjE5NmBT1feNFkMno= +github.com/pipeproxy/pipe v0.4.15/go.mod h1:fVvEupWbJ2ihcyWk/iSLYtBz/9wlZnshG8dWTCnlQ8k= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/internal/cmd/proxy/proxy.go b/internal/cmd/proxy/proxy.go index 87c2a72..d4299a1 100644 --- a/internal/cmd/proxy/proxy.go +++ b/internal/cmd/proxy/proxy.go @@ -73,7 +73,7 @@ var ( proxyCmd = &cobra.Command{ Use: "proxy", - Short: "Envoy proxy agent", + Short: "Pipe proxy agent", FParseErrWhitelist: cobra.FParseErrWhitelist{ // Allow unknown flags for backward-compatibility. UnknownFlags: true, @@ -110,8 +110,8 @@ var ( metadataJSON := string(metadataBytes) log.Println("xds server", url) log.Println("metadata", metadataJSON) - nodeId := fmt.Sprintf("%s~%s~%s.%s~%s", proxyType, podIP, podName, podNamespace, DNSDomain) + log.Println("node id", nodeId) notify.OnSlice([]os.Signal{syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM}, cancel) @@ -209,7 +209,7 @@ var ( } go func() { - for { + for ctx.Err() == nil { err = ads.Run(ctx) if err != nil { log.Println(err) @@ -223,6 +223,7 @@ var ( if err != nil { log.Println(err) } + time.Sleep(time.Second) } return nil }, @@ -230,7 +231,7 @@ var ( ) func init() { - proxyCmd.PersistentFlags().StringVar(&DNSDomain, "domain", "", + proxyCmd.PersistentFlags().StringVar(&DNSDomain, "domain", fmt.Sprintf("%s.svc.cluster.local", os.Getenv("POD_NAMESPACE")), "DNS domain suffix. If not provided uses ${POD_NAMESPACE}.svc.cluster.local") proxyCmd.PersistentFlags().StringVar(&meshConfigFile, "meshConfig", "./etc/istio/config/mesh", "File name for Istio mesh configuration. If not specified, a default mesh will be used. This may be overridden by "+ @@ -239,15 +240,15 @@ func init() { "HTTP Port on which to serve Security Token Service (STS). If zero, STS service will not be provided.") proxyCmd.PersistentFlags().StringVar(&tokenManagerPlugin, "tokenManagerPlugin", tokenmanager.GoogleTokenExchange, "Token provider specific plugin name.") + // Flags for proxy configuration proxyCmd.PersistentFlags().StringVar(&serviceCluster, "serviceCluster", constants.ServiceClusterName, "Service cluster") - // Log levels are provided by the library https://github.com/gabime/spdlog, used by Envoy. proxyCmd.PersistentFlags().StringVar(&proxyLogLevel, "proxyLogLevel", "warning", - fmt.Sprintf("The log level used to start the Envoy proxy (choose from {%s, %s, %s, %s, %s, %s, %s})", + fmt.Sprintf("The log level used to start the Pipe proxy (choose from {%s, %s, %s, %s, %s, %s, %s})", "trace", "debug", "info", "warning", "error", "critical", "off")) proxyCmd.PersistentFlags().IntVar(&concurrency, "concurrency", 0, "number of worker threads to run") proxyCmd.PersistentFlags().StringVar(&proxyComponentLogLevel, "proxyComponentLogLevel", "misc:error", - "The component log level used to start the Envoy proxy") + "The component log level used to start the Pipe proxy") proxyCmd.PersistentFlags().StringVar(&templateFile, "templateFile", "", "Go template bootstrap config") proxyCmd.PersistentFlags().StringVar(&outlierLogPath, "outlierLogPath", "", diff --git a/internal/cmd/wait/wait.go b/internal/cmd/wait/wait.go index ef41f72..b93d1d5 100644 --- a/internal/cmd/wait/wait.go +++ b/internal/cmd/wait/wait.go @@ -19,12 +19,12 @@ var ( waitCmd = &cobra.Command{ Use: "wait", - Short: "Waits until the Envoy proxy is ready", + Short: "Waits until the Pipe proxy is ready", RunE: func(c *cobra.Command, args []string) error { client := &http.Client{ Timeout: time.Duration(requestTimeoutMillis) * time.Millisecond, } - log.Infof("Waiting for Envoy proxy to be ready (timeout: %d seconds)...", timeoutSeconds) + log.Infof("Waiting for Pipe proxy to be ready (timeout: %d seconds)...", timeoutSeconds) var err error timeoutAt := time.Now().Add(time.Duration(timeoutSeconds) * time.Second) @@ -37,7 +37,7 @@ var ( log.Debugf("Not ready yet: %v", err) time.Sleep(time.Duration(periodMillis) * time.Millisecond) } - return fmt.Errorf("timeout waiting for Envoy proxy to become ready. Last error: %v", err) + return fmt.Errorf("timeout waiting for Pipe proxy to become ready. Last error: %v", err) }, } )