Skip to content
This repository has been archived by the owner on Oct 24, 2021. It is now read-only.

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Dec 3, 2020
1 parent 4351a20 commit cc9adbb
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ENV CGO_ENABLED=0
RUN go install ./cmd/pipe-xds

FROM alpine
COPY --from=pipeproxy/pipe:v0.4.14 /usr/local/bin/pipe /usr/local/bin/
COPY --from=pipeproxy/pipe:v0.4.15 /usr/local/bin/pipe /usr/local/bin/
COPY --from=builder /go/bin/pipe-xds /usr/local/bin/
RUN apk add -U --no-cache curl iptables ip6tables
WORKDIR /etc/istio/proxy
Expand Down
2 changes: 0 additions & 2 deletions cmd/pipe-xds/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
cleaniptables "istio.io/istio/tools/istio-clean-iptables/pkg/cmd"
iptables "istio.io/istio/tools/istio-iptables/pkg/cmd"
"istio.io/pkg/log"
"istio.io/pkg/version"
)

var (
Expand All @@ -36,7 +35,6 @@ func init() {
cmd.AddFlags(rootCmd)

rootCmd.AddCommand(proxy.GetCommand())
rootCmd.AddCommand(version.CobraCommand())
rootCmd.AddCommand(iptables.GetCommand())
rootCmd.AddCommand(cleaniptables.GetCommand())
rootCmd.AddCommand(wait.GetCommand())
Expand Down
51 changes: 37 additions & 14 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"reflect"
"sort"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -39,6 +40,7 @@ type ConfigCtx struct {
xds map[string]proto.Message
updateCh chan struct{}
interval time.Duration
mux sync.Mutex
}

func NewConfigCtx(ctx context.Context, cli *adsc.ADSC, basePath string, interval time.Duration) *ConfigCtx {
Expand Down Expand Up @@ -74,36 +76,48 @@ func (c *ConfigCtx) SetNodeID(nodeid string) {
}

func (c *ConfigCtx) RegisterCDS(name string, dialer bind.StreamDialer, msg proto.Message) {
c.mux.Lock()
defer c.mux.Unlock()
c.cds[name] = dialer
c.xds[name] = msg
c.update()
}

func (c *ConfigCtx) RegisterEDS(name string, dialer bind.StreamDialer, msg proto.Message) {
c.mux.Lock()
defer c.mux.Unlock()
c.eds[name] = dialer
c.xds[name] = msg
c.update()
}

func (c *ConfigCtx) RegisterLDS(name string, service bind.Service, msg proto.Message) {
c.mux.Lock()
defer c.mux.Unlock()
c.lds[name] = service
c.xds[name] = msg
c.update()
}

func (c *ConfigCtx) RegisterRDS(name string, handler bind.HTTPHandler, msg proto.Message) {
c.mux.Lock()
defer c.mux.Unlock()
c.rds[name] = handler
c.xds[name] = msg
c.update()
}

func (c *ConfigCtx) RegisterSDS(name string, tls bind.TLS, msg proto.Message) {
c.mux.Lock()
defer c.mux.Unlock()
c.sds[name] = tls
c.xds[name] = msg
c.update()
}

func (c *ConfigCtx) save() {
c.mux.Lock()
defer c.mux.Unlock()
componentSortd := []sortd{}
serviceSortd := []sortd{}

Expand Down Expand Up @@ -226,6 +240,9 @@ func (c *ConfigCtx) startPipe(ctx context.Context) error {
c.save()
cmd.Process.Signal(syscall.SIGHUP)
continue loop
case <-ctx.Done():
cmd.Process.Signal(syscall.SIGQUIT)
break loop
}
}
case <-ctx.Done():
Expand Down Expand Up @@ -275,25 +292,31 @@ var comm = []byte{'\n', '#', ' '}
var defaultServices = []bind.DefServiceConfig{
{
Name: "_health",
Def: bind.StreamServiceConfig{
Listener: bind.ListenerStreamListenConfigConfig{
Network: bind.ListenerStreamListenConfigListenerNetworkEnumEnumTCP,
Address: ":15021",
},
Handler: bind.HTTP1StreamHandlerConfig{
Handler: BuildHealthWithHTTPHandler(),
Def: bind.TagsServiceConfig{
Tag: "health",
Service: bind.StreamServiceConfig{
Listener: bind.ListenerStreamListenConfigConfig{
Network: bind.ListenerStreamListenConfigListenerNetworkEnumEnumTCP,
Address: ":15021",
},
Handler: bind.HTTP1StreamHandlerConfig{
Handler: BuildHealthWithHTTPHandler(),
},
},
},
},
{
Name: "_metric",
Def: bind.StreamServiceConfig{
Listener: bind.ListenerStreamListenConfigConfig{
Network: bind.ListenerStreamListenConfigListenerNetworkEnumEnumTCP,
Address: ":15090",
},
Handler: bind.HTTP1StreamHandlerConfig{
Handler: BuildPrometheusWithHTTPHandler(),
Def: bind.TagsServiceConfig{
Tag: "metric",
Service: bind.StreamServiceConfig{
Listener: bind.ListenerStreamListenConfigConfig{
Network: bind.ListenerStreamListenConfigListenerNetworkEnumEnumTCP,
Address: ":15090",
},
Handler: bind.HTTP1StreamHandlerConfig{
Handler: BuildPrometheusWithHTTPHandler(),
},
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ replace (
require (
github.com/envoyproxy/go-control-plane v0.9.8-0.20201019204000-12785f608982
github.com/golang/protobuf v1.4.3
github.com/pipeproxy/pipe v0.4.14
github.com/pipeproxy/pipe v0.4.15
github.com/spf13/cobra v1.1.1
github.com/wzshiming/gotype v0.6.3
github.com/wzshiming/notify v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -799,8 +799,8 @@ github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v3 v3.3.2 h1:QTUOCbMNDbK4PYtkuHyOBd28C0UhPBw3T4OH4WpFDik=
github.com/pierrec/lz4/v3 v3.3.2/go.mod h1:280XNCGS8jAcG++AHdd6SeWnzyJ1w9oow2vbORyey8Q=
github.com/pipeproxy/pipe v0.4.14 h1:0WngAXAiLi8OrefkfzXMAE44nfWpgtfxnkCwj7VRLOA=
github.com/pipeproxy/pipe v0.4.14/go.mod h1:fVvEupWbJ2ihcyWk/iSLYtBz/9wlZnshG8dWTCnlQ8k=
github.com/pipeproxy/pipe v0.4.15 h1:/pp61UdVIe5GgVHaY1jJT6edrnrjE5NmBT1feNFkMno=
github.com/pipeproxy/pipe v0.4.15/go.mod h1:fVvEupWbJ2ihcyWk/iSLYtBz/9wlZnshG8dWTCnlQ8k=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
15 changes: 8 additions & 7 deletions internal/cmd/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var (

proxyCmd = &cobra.Command{
Use: "proxy",
Short: "Envoy proxy agent",
Short: "Pipe proxy agent",
FParseErrWhitelist: cobra.FParseErrWhitelist{
// Allow unknown flags for backward-compatibility.
UnknownFlags: true,
Expand Down Expand Up @@ -110,8 +110,8 @@ var (
metadataJSON := string(metadataBytes)
log.Println("xds server", url)
log.Println("metadata", metadataJSON)

nodeId := fmt.Sprintf("%s~%s~%s.%s~%s", proxyType, podIP, podName, podNamespace, DNSDomain)
log.Println("node id", nodeId)

notify.OnSlice([]os.Signal{syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM}, cancel)

Expand Down Expand Up @@ -209,7 +209,7 @@ var (
}

go func() {
for {
for ctx.Err() == nil {
err = ads.Run(ctx)
if err != nil {
log.Println(err)
Expand All @@ -223,14 +223,15 @@ var (
if err != nil {
log.Println(err)
}
time.Sleep(time.Second)
}
return nil
},
}
)

func init() {
proxyCmd.PersistentFlags().StringVar(&DNSDomain, "domain", "",
proxyCmd.PersistentFlags().StringVar(&DNSDomain, "domain", fmt.Sprintf("%s.svc.cluster.local", os.Getenv("POD_NAMESPACE")),
"DNS domain suffix. If not provided uses ${POD_NAMESPACE}.svc.cluster.local")
proxyCmd.PersistentFlags().StringVar(&meshConfigFile, "meshConfig", "./etc/istio/config/mesh",
"File name for Istio mesh configuration. If not specified, a default mesh will be used. This may be overridden by "+
Expand All @@ -239,15 +240,15 @@ func init() {
"HTTP Port on which to serve Security Token Service (STS). If zero, STS service will not be provided.")
proxyCmd.PersistentFlags().StringVar(&tokenManagerPlugin, "tokenManagerPlugin", tokenmanager.GoogleTokenExchange,
"Token provider specific plugin name.")

// Flags for proxy configuration
proxyCmd.PersistentFlags().StringVar(&serviceCluster, "serviceCluster", constants.ServiceClusterName, "Service cluster")
// Log levels are provided by the library https://github.com/gabime/spdlog, used by Envoy.
proxyCmd.PersistentFlags().StringVar(&proxyLogLevel, "proxyLogLevel", "warning",
fmt.Sprintf("The log level used to start the Envoy proxy (choose from {%s, %s, %s, %s, %s, %s, %s})",
fmt.Sprintf("The log level used to start the Pipe proxy (choose from {%s, %s, %s, %s, %s, %s, %s})",
"trace", "debug", "info", "warning", "error", "critical", "off"))
proxyCmd.PersistentFlags().IntVar(&concurrency, "concurrency", 0, "number of worker threads to run")
proxyCmd.PersistentFlags().StringVar(&proxyComponentLogLevel, "proxyComponentLogLevel", "misc:error",
"The component log level used to start the Envoy proxy")
"The component log level used to start the Pipe proxy")
proxyCmd.PersistentFlags().StringVar(&templateFile, "templateFile", "",
"Go template bootstrap config")
proxyCmd.PersistentFlags().StringVar(&outlierLogPath, "outlierLogPath", "",
Expand Down
6 changes: 3 additions & 3 deletions internal/cmd/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ var (

waitCmd = &cobra.Command{
Use: "wait",
Short: "Waits until the Envoy proxy is ready",
Short: "Waits until the Pipe proxy is ready",
RunE: func(c *cobra.Command, args []string) error {
client := &http.Client{
Timeout: time.Duration(requestTimeoutMillis) * time.Millisecond,
}
log.Infof("Waiting for Envoy proxy to be ready (timeout: %d seconds)...", timeoutSeconds)
log.Infof("Waiting for Pipe proxy to be ready (timeout: %d seconds)...", timeoutSeconds)

var err error
timeoutAt := time.Now().Add(time.Duration(timeoutSeconds) * time.Second)
Expand All @@ -37,7 +37,7 @@ var (
log.Debugf("Not ready yet: %v", err)
time.Sleep(time.Duration(periodMillis) * time.Millisecond)
}
return fmt.Errorf("timeout waiting for Envoy proxy to become ready. Last error: %v", err)
return fmt.Errorf("timeout waiting for Pipe proxy to become ready. Last error: %v", err)
},
}
)
Expand Down

0 comments on commit cc9adbb

Please sign in to comment.