Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom Go-based dataplane for xDS control plane APIs #58

Merged
merged 112 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 105 commits
Commits
Show all changes
112 commits
Select commit Hold shift + click to select a range
90d1147
rename existing dataplane command as v0
praveingk Sep 25, 2023
03ed295
Replace with new dataplane command
praveingk Sep 25, 2023
4c60a5e
xdsClient for the dataplane
praveingk Sep 25, 2023
b9c40d5
Add dataplane server for sni and mtls connections
praveingk Sep 25, 2023
9418319
Add build commands for the new dataplane
praveingk Sep 25, 2023
6c01a95
Change timeout values
praveingk Sep 26, 2023
4605ae9
Remove unneccessary timeouts
praveingk Sep 26, 2023
31e0a31
fix linter issues in the dataplane
praveingk Sep 26, 2023
f713c53
Uncomment mbg build
praveingk Sep 26, 2023
2b0a9c2
Fix linter issues in dataplane binary
praveingk Sep 26, 2023
3e7ba46
Remove hardcoded dataplane server address
praveingk Sep 26, 2023
b9c58bc
Fix dataplaneserveraddress comment
praveingk Sep 27, 2023
c1ed0a9
Fix import ordering
praveingk Sep 27, 2023
55c563c
Remove peer from error
praveingk Sep 27, 2023
8f13285
Fix log
praveingk Sep 27, 2023
b7188cf
Move credentials to variable
praveingk Sep 27, 2023
c6360cc
Fix import ordering and comments
praveingk Sep 27, 2023
e22060f
Fix clusters/listener naming
praveingk Sep 27, 2023
21fc4e6
Rename listener function
praveingk Sep 27, 2023
ecd69b8
Apply suggestions from code review
praveingk Sep 27, 2023
c6678a2
Merge branch 'new-dataplane' of github.com:praveingk/clusterlink into…
praveingk Sep 27, 2023
c3077d8
Fix linter errors from reviews
praveingk Sep 27, 2023
a6db1dd
Apply logger suggestions from code review
praveingk Sep 27, 2023
9c6aedb
Rename xdsclient
praveingk Sep 27, 2023
15f4fd5
Fix log error
praveingk Sep 27, 2023
f3ed9a0
Rename listenerConn to appConn
praveingk Sep 27, 2023
6658b7a
Merge branch 'new-dataplane' of github.com:praveingk/clusterlink into…
praveingk Sep 27, 2023
d4297d1
Add go dataplane inside cl-dataplane binary
praveingk Sep 27, 2023
a6bd553
Remove separate go dataplane command
praveingk Sep 27, 2023
0add492
Remove go dataplane from makefile
praveingk Sep 27, 2023
08c9167
Edit comment on dataplane types
praveingk Sep 27, 2023
283854b
Fix typos and redundant logs
praveingk Sep 27, 2023
0a0e49b
Fix logs in dataplane
praveingk Sep 27, 2023
f9c487b
fix linter errors
praveingk Sep 27, 2023
c1c84cb
Remove ununsed authClient
praveingk Sep 27, 2023
5fc64a9
Rename buffersize
praveingk Sep 27, 2023
83e7c87
Add newForwarder function
praveingk Sep 27, 2023
9b8bfe6
Move body.close
praveingk Sep 27, 2023
73e6e2d
Close connections at the end of forwarder
praveingk Sep 27, 2023
81db0ed
Fix log formatting
praveingk Sep 28, 2023
de29b39
Merge branch 'main' of github.com:praveingk/clusterlink into new-data…
praveingk Sep 28, 2023
c293b5b
Remove additional builds during merge
praveingk Sep 28, 2023
58d81e1
Fix imports to clusterlink-net
praveingk Sep 28, 2023
204fbb6
revert closing of connections to individual functions
praveingk Sep 28, 2023
f1d916e
Move credentials to xdsclient
praveingk Sep 28, 2023
5ff3bdf
Add fetcher and client cleanup
praveingk Sep 29, 2023
fcebdf4
Update fetcher.go
elevran Sep 29, 2023
8375733
address linter errors
elevran Sep 29, 2023
97c5dff
Fix logger and context
praveingk Sep 29, 2023
43b3106
Remove redundant log
praveingk Sep 29, 2023
4878894
Remove redundant log
praveingk Sep 29, 2023
5293160
Merge branch 'new-dataplane' of github.com:praveingk/clusterlink into…
praveingk Oct 2, 2023
0e4afa1
Merge branch 'main' into newdp
praveingk Oct 2, 2023
5582e80
Add cl-go-dataplane binary for go dataplane
praveingk Oct 4, 2023
e6d4734
Merge branch 'newdp' of github.com:praveingk/clusterlink into newdp
praveingk Oct 4, 2023
c49330c
Remove unused type from dataplane cli
praveingk Oct 5, 2023
5ea2555
Address review comments
praveingk Oct 5, 2023
ff96834
Remove err handling
praveingk Oct 5, 2023
1c9c5f5
Remove unused close
praveingk Oct 5, 2023
67b3ae3
Add graceful closing of connections
praveingk Oct 6, 2023
34fc998
Merge branch 'main' into newdp
elevran Oct 8, 2023
d8802d7
Merge branch 'main' into newdp
elevran Oct 8, 2023
317fb8c
Fix ingress authorization path to be consistent with envoy
praveingk Oct 9, 2023
e02d48c
Merge branch 'newdp' of github.com:praveingk/clusterlink into newdp
praveingk Oct 9, 2023
7ce2100
Update pkg/dataplane/server/server.go
praveingk Oct 9, 2023
02ac18d
Update pkg/dataplane/server/server.go
praveingk Oct 9, 2023
46146bb
Update pkg/dataplane/server/server.go
praveingk Oct 9, 2023
b2d21e2
Update pkg/dataplane/server/listener.go
praveingk Oct 9, 2023
87629c3
Update pkg/dataplane/server/listener.go
praveingk Oct 9, 2023
4b3ecbe
Update pkg/dataplane/server/listener.go
praveingk Oct 9, 2023
ef82121
Update pkg/dataplane/server/server.go
praveingk Oct 9, 2023
9138afe
Update pkg/dataplane/server/server.go
praveingk Oct 9, 2023
60dfb7f
Update pkg/dataplane/server/server.go
praveingk Oct 9, 2023
b98d98b
Update pkg/dataplane/client/fetcher.go
praveingk Oct 9, 2023
b177f5b
Update pkg/dataplane/client/fetcher.go
praveingk Oct 9, 2023
0a148ad
Update pkg/dataplane/client/fetcher.go
praveingk Oct 9, 2023
a798413
Correct log messsages
praveingk Oct 9, 2023
5ba28af
Merge branch 'newdp' of github.com:praveingk/clusterlink into newdp
praveingk Oct 9, 2023
62abd9c
Revert removed newline
praveingk Oct 9, 2023
7b363c0
Fix ingress authorization path
praveingk Oct 9, 2023
3527813
Update pkg/dataplane/server/server.go
praveingk Oct 9, 2023
fb71e2c
Update pkg/dataplane/server/server.go
praveingk Oct 9, 2023
787fe18
Update pkg/dataplane/server/server.go
praveingk Oct 9, 2023
10b2220
Update pkg/dataplane/server/listener.go
praveingk Oct 9, 2023
0adaaf7
Update pkg/dataplane/server/listener.go
praveingk Oct 9, 2023
6797759
Update pkg/dataplane/server/server.go
praveingk Oct 9, 2023
fa4fc63
Update pkg/dataplane/server/server.go
praveingk Oct 9, 2023
c7afc1b
Update pkg/dataplane/server/server.go
praveingk Oct 9, 2023
d5065d5
Update pkg/dataplane/client/fetcher.go
praveingk Oct 9, 2023
6df22c6
Update pkg/dataplane/client/fetcher.go
praveingk Oct 9, 2023
d0acdeb
Update pkg/dataplane/client/fetcher.go
praveingk Oct 9, 2023
79257ad
Revert removed newline
praveingk Oct 9, 2023
574e758
Fix ingress authorization path
praveingk Oct 9, 2023
49519a9
Fix error handling and sleep
praveingk Oct 9, 2023
5a1fb9d
Merge branch 'newdp' of github.com:praveingk/clusterlink into newdp
praveingk Oct 9, 2023
04eaf18
Apply suggestions from code review
praveingk Oct 9, 2023
1814188
Use atomic bool
praveingk Oct 9, 2023
6b8f0c2
Merge branch 'newdp' of github.com:praveingk/clusterlink into newdp
praveingk Oct 9, 2023
6e90d1f
Merge branch 'main' into newdp
praveingk Oct 9, 2023
e61f3c7
Add lock while editing errors map
praveingk Oct 10, 2023
a1510e4
Use hostname from cluster
praveingk Oct 10, 2023
1a14c3d
Reduce read deadline to 100 ms
praveingk Oct 10, 2023
c63110c
Add dataplane type to choose between envoy and go-based dataplane
praveingk Oct 10, 2023
055ee7a
Merge branch 'newdp' of github.com:praveingk/clusterlink into newdp
praveingk Oct 10, 2023
7ac823f
Enable e2e tests with go based dataplane on k8s and docker
praveingk Oct 10, 2023
f420a22
Reduce read deadline to minimize teardown time
praveingk Oct 11, 2023
b56b087
Add dataplane-type and simplify template
praveingk Oct 11, 2023
48f9ac1
Add e2e test with go dataplane
praveingk Oct 11, 2023
0805f39
Fix lint errors due to POST
praveingk Oct 11, 2023
243cdbf
Cleanup condition on dataplane type
praveingk Oct 12, 2023
f0af8f8
Change TYPE to DATAPLANE_TYPE
praveingk Oct 12, 2023
0faaeb2
Add verification for dataplane type
praveingk Oct 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ build:
$(GO) build -o ./bin/dataplane ./cmd/dataplane/main.go
$(GO) build -o ./bin/cl-controlplane ./cmd/cl-controlplane
$(GO) build -o ./bin/cl-dataplane ./cmd/cl-dataplane
$(GO) build -o ./bin/cl-go-dataplane ./cmd/cl-go-dataplane
$(GO) build -o ./bin/cl-adm ./cmd/cl-adm


docker-build: build
docker build --progress=plain --rm --tag mbg .
docker build --progress=plain --rm --tag cl-controlplane -f ./cmd/cl-controlplane/Dockerfile .
docker build --progress=plain --rm --tag cl-dataplane -f ./cmd/cl-dataplane/Dockerfile .
docker build --progress=plain --rm --tag cl-go-dataplane -f ./cmd/cl-go-dataplane/Dockerfile .
docker build --progress=plain --rm --tag gwctl -f ./cmd/gwctl/Dockerfile .

build-image:
Expand Down
8 changes: 6 additions & 2 deletions cmd/cl-adm/cmd/create/create_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ type PeerOptions struct {
Name string
// Dataplanes is the number of dataplanes to create.
Dataplanes uint16
// DataplaneType is the type of dataplane to create (envoy or go-based)
DataplaneType string
}

// AddFlags adds flags to fs and binds them to options.
func (o *PeerOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Name, "name", "", "Peer name.")
fs.Uint16Var(&o.Dataplanes, "dataplanes", 1, "Number of dataplanes.")
fs.StringVar(&o.DataplaneType, "dataplaneType", "envoy", "Type of dataplane")
praveingk marked this conversation as resolved.
Show resolved Hide resolved
}

// RequiredFlags are the names of flags that must be explicitly specified.
Expand Down Expand Up @@ -136,8 +139,9 @@ func (o *PeerOptions) Run() error {

// deployment configuration
args, err := templates.Config{
Peer: o.Name,
Dataplanes: o.Dataplanes,
Peer: o.Name,
Dataplanes: o.Dataplanes,
DataplaneType: o.DataplaneType,
}.TemplateArgs()
if err != nil {
return err
Expand Down
8 changes: 6 additions & 2 deletions cmd/cl-adm/templates/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Config struct {

// Dataplanes is the number of dataplane servers to run.
Dataplanes uint16

// DataplaneType is the type of dataplane to create (envoy or go-based)
DataplaneType string
}

// TemplateArgs returns arguments for instantiating a text/template
Expand Down Expand Up @@ -64,8 +67,9 @@ func (c Config) TemplateArgs() (map[string]interface{}, error) {
}

return map[string]interface{}{
"peer": c.Peer,
"dataplanes": c.Dataplanes,
"peer": c.Peer,
"dataplanes": c.Dataplanes,
"dataplaneType": c.DataplaneType,

"fabricCA": base64.StdEncoding.EncodeToString(fabricCA),
"peerCA": base64.StdEncoding.EncodeToString(peerCA),
Expand Down
14 changes: 14 additions & 0 deletions cmd/cl-adm/templates/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ cl-controlplane \
--log-level info \
--log-file {{.persistencyDirectoryMountPath}}/log.log \

{{ if (eq .dataplaneType "envoy") }}
docker run -itd \
--name {{.peer}}-dataplane \
-v $FABRIC_DIR/{{.fabricCAPath}}:{{.dataplaneCAMountPath}} \
Expand All @@ -38,6 +39,19 @@ cl-dataplane \
--controlplane-host {{.peer}}-controlplane \
--log-level info \
--log-file {{.persistencyDirectoryMountPath}}/log.log
{{ else }}
docker run -itd \
--name {{.peer}}-dataplane \
-v $FABRIC_DIR/{{.fabricCAPath}}:{{.dataplaneCAMountPath}} \
-v $FABRIC_DIR/{{.dataplaneCertPath}}:{{.dataplaneCertMountPath}} \
-v $FABRIC_DIR/{{.dataplaneKeyPath}}:{{.dataplaneKeyMountPath}} \
-v $FABRIC_DIR/{{.dataplanePersistencyDirectory}}:{{.persistencyDirectoryMountPath}} \
cl-go-dataplane \
cl-go-dataplane \
praveingk marked this conversation as resolved.
Show resolved Hide resolved
--controlplane-host {{.peer}}-controlplane \
--log-level info \
--log-file {{.persistencyDirectoryMountPath}}/log.log
{{ end }}

docker run -itd \
--name {{.peer}}-gwctl \
Expand Down
4 changes: 4 additions & 0 deletions cmd/cl-adm/templates/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ spec:
secretName: cl-dataplane
containers:
- name: dataplane
{{ if (eq .dataplaneType "envoy") }}
image: cl-dataplane
{{ else }}
image: cl-go-dataplane
{{ end }}
imagePullPolicy: IfNotPresent
args: ["--controlplane-host", "cl-controlplane", "--log-level", "info"]
ports:
Expand Down
13 changes: 13 additions & 0 deletions cmd/cl-go-dataplane/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM alpine:3.14

# Copy binary
RUN mkdir -p /usr/local/bin
COPY ./bin/cl-go-dataplane /usr/local/bin/cl-go-dataplane

# Create directory for private keys
RUN mkdir -p /etc/ssl/private

# Create directory for certificates
RUN mkdir -p /etc/ssl/certs

ENTRYPOINT ["/usr/local/bin/cl-go-dataplane"]
156 changes: 156 additions & 0 deletions cmd/cl-go-dataplane/app/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package app

import (
"fmt"
"net"
"os"
"strconv"

"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

cpapi "github.com/clusterlink-net/clusterlink/pkg/controlplane/api"
"github.com/clusterlink-net/clusterlink/pkg/dataplane/api"
dpclient "github.com/clusterlink-net/clusterlink/pkg/dataplane/client"
dpserver "github.com/clusterlink-net/clusterlink/pkg/dataplane/server"
"github.com/clusterlink-net/clusterlink/pkg/util"
)

const (
// logLevel is the default log level.
logLevel = "warn"

// CAFile is the path to the certificate authority file.
CAFile = "/etc/ssl/certs/clink_ca.pem"
// CertificateFile is the path to the certificate file.
CertificateFile = "/etc/ssl/certs/clink-dataplane.pem"
// KeyFile is the path to the private-key file.
KeyFile = "/etc/ssl/private/clink-dataplane.pem"

// dataplaneServerAddress is the address of the dataplane HTTP server for accepting ingress dataplane connections.
dataplaneServerAddress = "127.0.0.1:8443"
)

// Options contains everything necessary to create and run a dataplane.
type Options struct {
// ControlplaneHost is the IP/hostname of the controlplane.
ControlplaneHost string
// LogFile is the path to file where logs will be written.
LogFile string
// LogLevel is the log level.
LogLevel string
}

// AddFlags adds flags to fs and binds them to options.
func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.ControlplaneHost, "controlplane-host", "",
"The controlplane IP/hostname.")
fs.StringVar(&o.LogFile, "log-file", "",
"Path to a file where logs will be written. If not specified, logs will be printed to stderr.")
fs.StringVar(&o.LogLevel, "log-level", logLevel,
"The log level. One of fatal, error, warn, info, debug.")
}

// RequiredFlags are the names of flags that must be explicitly specified.
func (o *Options) RequiredFlags() []string {
return []string{"controlplane-host"}
}

// Run the go dataplane.
func (o *Options) runGoDataplane(peerName, dataplaneID string, parsedCertData *util.ParsedCertData) error {
controlplaneTarget := net.JoinHostPort(o.ControlplaneHost, strconv.Itoa(cpapi.ListenPort))

praveingk marked this conversation as resolved.
Show resolved Hide resolved
log.Infof("Starting go dataplane, Name: %s, ID: %s", peerName, dataplaneID)

dataplane := dpserver.NewDataplane(dataplaneID, controlplaneTarget, peerName, parsedCertData)
go func() {
err := dataplane.StartDataplaneServer(dataplaneServerAddress)
log.Errorf("Failed to start dataplane server: %v.", err)
}()

go func() {
err := dataplane.StartSNIServer(dataplaneServerAddress)
log.Error("Failed to start dataplane server", err)
}()

// Start xDS client, if it fails to start we keep retrying to connect to the controlplane host
tlsConfig := parsedCertData.ClientConfig(cpapi.GRPCServerName(peerName))
xdsClient := dpclient.NewXDSClient(dataplane, controlplaneTarget, tlsConfig)
err := xdsClient.Run()
return fmt.Errorf("xDS Client stopped: %v", err)
}

// Run the dataplane.
func (o *Options) Run() error {
// set log file
praveingk marked this conversation as resolved.
Show resolved Hide resolved
if o.LogFile != "" {
f, err := os.OpenFile(o.LogFile, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return fmt.Errorf("unable to open log file: %v", err)
}

defer func() {
if err := f.Close(); err != nil {
log.Errorf("Cannot close log file: %v", err)
}
}()

log.SetOutput(f)
}

// set log level
logLevel, err := log.ParseLevel(o.LogLevel)
if err != nil {
return fmt.Errorf("unable to set log level: %v", err)
}
log.SetLevel(logLevel)

// parse TLS files
parsedCertData, err := util.ParseTLSFiles(CAFile, CertificateFile, KeyFile)
if err != nil {
return err
}

dnsNames := parsedCertData.DNSNames()
if len(dnsNames) != 1 {
return fmt.Errorf("expected peer certificate to contain a single DNS name, but got %d", len(dnsNames))
}

peerName, err := api.StripServerPrefix(dnsNames[0])
if err != nil {
return err
}

// generate random dataplane ID
dataplaneID := uuid.New().String()
elevran marked this conversation as resolved.
Show resolved Hide resolved
log.Infof("Dataplane ID: %s.", dataplaneID)

return o.runGoDataplane(peerName, dataplaneID, parsedCertData)
}

// NewCLGoDataplaneCommand creates a *cobra.Command object with default parameters.
func NewCLGoDataplaneCommand() *cobra.Command {
opts := &Options{}

cmd := &cobra.Command{
Use: "cl-go-dataplane",
Long: `cl-go-dataplane: dataplane agent for allowing network connectivity of remote clients and services`,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
return opts.Run()
},
}

opts.AddFlags(cmd.Flags())

for _, flag := range opts.RequiredFlags() {
if err := cmd.MarkFlagRequired(flag); err != nil {
fmt.Printf("Error marking required flag '%s': %v\n", flag, err)
os.Exit(1)
}
}

return cmd
}
15 changes: 15 additions & 0 deletions cmd/cl-go-dataplane/cl-go-dataplane.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// The cl-dataplane binary runs an instance of a clink dataplane.
package main

import (
"os"

"github.com/clusterlink-net/clusterlink/cmd/cl-go-dataplane/app"
)

func main() {
command := app.NewCLGoDataplaneCommand()
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
1 change: 0 additions & 1 deletion cmd/dataplane/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func main() {

praveingk marked this conversation as resolved.
Show resolved Hide resolved
// Set Dataplane
dp := dp.NewDataplane(&store.Store{ID: id, CertAuthority: ca, Cert: cert, Key: key, Dataplane: dataplane}, controlplane)

dp.StartServer(port)
log.Infof("Dataplane main process is finished")
}
94 changes: 94 additions & 0 deletions pkg/dataplane/client/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package client

import (
"context"
"fmt"

cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
client "github.com/envoyproxy/go-control-plane/pkg/client/sotw/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/clusterlink-net/clusterlink/pkg/dataplane/server"
)

type fetcher struct {
client client.ADSClient
resourceType string
dataplane *server.Dataplane
logger *logrus.Entry
}

func (f *fetcher) handleClusters(resources []*anypb.Any) error {
for _, r := range resources {
c := &cluster.Cluster{}
err := anypb.UnmarshalTo(r, c, proto.UnmarshalOptions{})
if err != nil {
return err
}

f.logger.Debugf("Cluster: %s.", c.Name)
f.dataplane.AddCluster(c)
}
return nil
}

func (f *fetcher) handleListeners(resources []*anypb.Any) error {
for _, r := range resources {
l := &listener.Listener{}
err := anypb.UnmarshalTo(r, l, proto.UnmarshalOptions{})
if err != nil {
return err
}
f.logger.Debugf("Listener: %s.", l.Name)
f.dataplane.AddListener(l)
}
return nil
}

func (f *fetcher) Run() error {
for {
resp, err := f.client.Fetch()
if err != nil {
f.logger.Errorf("Failed to fetch %s: %v.", f.resourceType, err)
return err
}
switch f.resourceType {
case resource.ClusterType:
err := f.handleClusters(resp.Resources)
if err != nil {
f.logger.Errorf("Failed to handle clusters: %v.", err)
}
case resource.ListenerType:
err := f.handleListeners(resp.Resources)
if err != nil {
f.logger.Errorf("Failed to handle listeners: %v.", err)
}
default:
return fmt.Errorf("unknown resource type")
}

err = f.client.Ack()
if err != nil {
f.logger.Errorf("Failed to ack: %v.", err)
}
}
}

func newFetcher(ctx context.Context, conn *grpc.ClientConn, resourceType string, dataplane *server.Dataplane) (*fetcher, error) {
client := client.NewADSClient(ctx, &core.Node{Id: dataplane.ID}, resourceType)
err := client.InitConnect(conn)
if err != nil {
return nil, err
}
return &fetcher{client: client,
resourceType: resourceType,
dataplane: dataplane,
logger: logrus.WithField("component", "fetcher.xds.client"),
}, nil
}
Loading