From 214fa1c79575685719d2c3106eb8dd59f4c107fc Mon Sep 17 00:00:00 2001 From: Sachin Holla <51310506+sachinholla@users.noreply.github.com> Date: Sat, 17 Jun 2023 05:12:25 +0530 Subject: [PATCH] TranslClient: Use new translib subscription APIs (#122) * Allow data clients to send full gnmipb.Notification Added gnmipb.Notification as a member of the Value messgae object defined by sonic_internal.proto. Data clients now can fill a complete Notification object instead of timestamp, path, TypedValue separately. * Use origin to identify translib yang paths Use TranslClient to handle the subscribe requests when origin is "openconfig". Fallback to the existing target based identification logic if origin is not given. * Use new translib subscribe APIs in TranslClient StreamRun() enhancements: - Create a new SubscribeSession in the beginning and pass it to all further translib API calls - Call translib.IsSubscribeSupported() to reasolve preferences of the requested paths. This also splits the target_defined request into on_change and sample paths. - Use one translib.Subscribe() call passing all on_change enabled paths, if there are any. This will handle initial updates and subsequent on_change notifications. - Use one translib.Stream() call for each sample subscribe path, if present. This will be called in a loop at every sample interval. - Maintain the ygot objects received for each translib.Stream() call in a cache. Diff the current set of objects with the objects from the previous iteration to resolve deleted paths and modified values (when suppress_redundant is enabled). PollRun() and OnceRun() enhancements: - Call translib.Stream() with each subscribed path to generate notification data. - In poll mode, this is repeated when a poll message is received * Increase gnmi_server gotest timeout to 20m --- Makefile | 2 +- gnmi_server/client_subscribe.go | 34 +- gnmi_server/transl_sub_test.go | 937 ++++++++++++++++++++++++ go.mod | 1 + proto/sonic_internal.pb.go | 301 ++++++-- proto/sonic_internal.proto | 3 + sonic_data_client/db_client.go | 13 + sonic_data_client/transl_data_client.go | 520 ++++++------- sonic_data_client/transl_subscriber.go | 386 ++++++++++ 9 files changed, 1829 insertions(+), 368 deletions(-) create mode 100644 gnmi_server/transl_sub_test.go create mode 100644 sonic_data_client/transl_subscriber.go diff --git a/Makefile b/Makefile index 24efbe55..ab9578b3 100644 --- a/Makefile +++ b/Makefile @@ -96,7 +96,7 @@ $(ENVFILE): check_gotest: $(DBCONFG) $(ENVFILE) sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-config.txt -covermode=atomic -v github.com/sonic-net/sonic-gnmi/sonic_db_config - sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(TESTENV) $(GO) test -race -coverprofile=coverage-gnmi.txt -covermode=atomic -mod=vendor $(BLD_FLAGS) -v github.com/sonic-net/sonic-gnmi/gnmi_server -coverpkg ../... + sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(TESTENV) $(GO) test -race -timeout 20m -coverprofile=coverage-gnmi.txt -covermode=atomic -mod=vendor $(BLD_FLAGS) -v github.com/sonic-net/sonic-gnmi/gnmi_server -coverpkg ../... sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(TESTENV) $(GO) test -coverprofile=coverage-dialcout.txt -covermode=atomic -mod=vendor $(BLD_FLAGS) -v github.com/sonic-net/sonic-gnmi/dialout/dialout_client sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-data.txt -covermode=atomic -mod=vendor -v github.com/sonic-net/sonic-gnmi/sonic_data_client sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-dbus.txt -covermode=atomic -mod=vendor -v github.com/sonic-net/sonic-gnmi/sonic_service_client diff --git a/gnmi_server/client_subscribe.go b/gnmi_server/client_subscribe.go index bea9ca50..5d27177a 100644 --- a/gnmi_server/client_subscribe.go +++ b/gnmi_server/client_subscribe.go @@ -10,6 +10,7 @@ import ( log "github.com/golang/glog" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" sdc "github.com/sonic-net/sonic-gnmi/sonic_data_client" gnmipb "github.com/openconfig/gnmi/proto/gnmi" ) @@ -128,23 +129,23 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { return grpc.Errorf(codes.InvalidArgument, "first message must be SubscriptionList: %q", query) } - var target string prefix := c.subscribe.GetPrefix() - if prefix == nil { - return grpc.Errorf(codes.Unimplemented, "No target specified in prefix") - } else { - target = prefix.GetTarget() - // TODO: add data client support for fetching non-db data - if target == "" { - return grpc.Errorf(codes.Unimplemented, "Empty target data not supported yet") - } - } + origin := prefix.GetOrigin() + target := prefix.GetTarget() paths, err := c.populateDbPathSubscrition(c.subscribe) if err != nil { return grpc.Errorf(codes.NotFound, "Invalid subscription path: %v %q", err, query) } + if o, err := ParseOrigin(paths); err != nil { + return err // origin conflict within paths + } else if len(origin) == 0 { + origin = o // Use origin from paths if not given in prefix + } else if len(o) != 0 && o != origin { + return status.Error(codes.InvalidArgument, "Origin conflict between prefix and paths") + } + if connectionKey, valid = connectionManager.Add(c.addr, query.String()); !valid { return grpc.Errorf(codes.Unavailable, "Server connections are at capacity.") } @@ -155,7 +156,18 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { mode := c.subscribe.GetMode() - if target == "OTHERS" { + log.V(3).Infof("mode=%v, origin=%q, target=%q", mode, origin, target) + + if origin == "openconfig" { + dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions, sdc.TranslWildcardOption{}) + } else if len(origin) != 0 { + return grpc.Errorf(codes.Unimplemented, "Unsupported origin: %s", origin) + } else if target == "" { + // This and subsequent conditions handle target based path identification + // when origin == "". As per the spec it should have been treated as "openconfig". + // But we take a deviation and stick to legacy logic for backward compatibility + return grpc.Errorf(codes.Unimplemented, "Empty target data not supported") + } else if target == "OTHERS" { dc, err = sdc.NewNonDbClient(paths, prefix) } else if ((target == "EVENTS") && (mode == gnmipb.SubscriptionList_STREAM)) { dc, err = sdc.NewEventClient(paths, prefix, c.logLevel) diff --git a/gnmi_server/transl_sub_test.go b/gnmi_server/transl_sub_test.go new file mode 100644 index 00000000..4998ae22 --- /dev/null +++ b/gnmi_server/transl_sub_test.go @@ -0,0 +1,937 @@ +package gnmi + +import ( + "crypto/tls" + "fmt" + "path/filepath" + "reflect" + "strings" + "testing" + "time" + + "github.com/Azure/sonic-mgmt-common/translib" + "github.com/Workiva/go-datastructures/queue" + "github.com/golang/protobuf/proto" + "github.com/openconfig/gnmi/client" + gnmipath "github.com/openconfig/gnmi/path" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" + extnpb "github.com/openconfig/gnmi/proto/gnmi_ext" + "github.com/openconfig/ygot/ygot" + spb "github.com/sonic-net/sonic-gnmi/proto" + dbconfig "github.com/sonic-net/sonic-gnmi/sonic_db_config" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" +) + +// This file contains subscription test cases for translib managed paths + +const ( + ONCE = gnmipb.SubscriptionList_ONCE + POLL = gnmipb.SubscriptionList_POLL + STREAM = gnmipb.SubscriptionList_STREAM + ON_CHANGE = gnmipb.SubscriptionMode_ON_CHANGE + SAMPLE = gnmipb.SubscriptionMode_SAMPLE + TARGET_DEFINED = gnmipb.SubscriptionMode_TARGET_DEFINED +) + +func TestTranslSubscribe(t *testing.T) { + s := createServer(t, 8081) + go runServer(t, s) + defer s.s.Stop() + + prepareDbTranslib(t) + + t.Run("origin=openconfig", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.OK) + sub.Verify(client.Sync{}) + }) + + t.Run("origin=invalid", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("invalid:/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.Unimplemented) + sub.Verify() + }) + + t.Run("origin=empty,target=empty", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.Unimplemented) + sub.Verify() + }) + + t.Run("origin in path", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("openconfig:/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.OK) + sub.Verify(client.Sync{}) + }) + + t.Run("origin conflict", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("xxx:/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + t.Run("origin conflict in paths", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("openconfig:/openconfig-acl:acl/acl-sets")}, + {Path: strToPath("closeconfig:/openconfig-interfaces/interfaces")}, + }} + sub := doSubscribe(t, req, codes.Unimplemented) + sub.Verify() + }) + + acl1Path := "/openconfig-acl:acl/acl-sets/acl-set[name=ONE][type=ACL_IPV4]" + acl2Path := "/openconfig-acl:acl/acl-sets/acl-set[name=TWO][type=ACL_IPV4]" + + acl1CreatePb := newPbUpdate("/openconfig-acl:acl/acl-sets/acl-set", + `{"acl-set": [{"name": "ONE", "type": "ACL_IPV4", "config": {"name": "ONE", "type": "ACL_IPV4"}}]}`) + acl2CreatePb := newPbUpdate("/openconfig-acl:acl/acl-sets/acl-set", + `{"acl-set": [{"name": "TWO", "type": "ACL_IPV4", "config": {"name": "TWO", "type": "ACL_IPV4", "description": "foo"}}]}`) + acl2DescUpdatePb := newPbUpdate(acl2Path+"/config/description", `{"description": "new"}`) + + acl1DeletePb := strToPath(acl1Path) + acl2DeletePb := strToPath(acl2Path) + acl2DescDeletePb := strToPath(acl2Path + "/config/description") + aclAllDeletePb := strToPath("/openconfig-acl:acl/acl-sets") + + t.Run("ONCE", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + doSet(t, acl1CreatePb) + + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/openconfig-acl:acl/acl-sets/acl-set")}, + }} + + sub := doSubscribe(t, req, codes.OK) + sub.Verify( + Updated(acl1Path+"/name", "ONE"), + Updated(acl1Path+"/type", "ACL_IPV4"), + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + Updated(acl1Path+"/state/name", "ONE"), + Updated(acl1Path+"/state/type", "ACL_IPV4"), + client.Sync{}, + ) + }) + + t.Run("POLL", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + + t.Logf("Start POLL subscription for ACL config container") + req := &gnmipb.SubscriptionList{ + Mode: POLL, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/openconfig-acl:acl/acl-sets/acl-set[name=*][type=*]/config")}, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify empty initial updates") + sub.Verify(client.Sync{}) + + t.Logf("Create ACl1") + time.Sleep(2 * time.Second) + doSet(t, acl1CreatePb) + + t.Logf("Verify poll updates include ACL1 data") + sub.Poll() + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + client.Sync{}, + ) + + t.Logf("Create ACL2") + time.Sleep(2 * time.Second) + doSet(t, acl2CreatePb) + + t.Logf("Verify poll updates include both ACL1 and ACL2 data") + sub.Poll() + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/name", "TWO"), + Updated(acl2Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/description", "foo"), + client.Sync{}, + ) + + t.Logf("Delete ACL2") + time.Sleep(2 * time.Second) + doSet(t, acl2DeletePb) + + t.Logf("Verify poll updates now include ACL1 data only") + sub.Poll() + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + client.Sync{}, + ) + }) + + t.Run("ONCHANGE", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + + t.Logf("Start ON_CHANGE subscription for ACL config container") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/acl-set[name=*][type=*]/config"), Mode: ON_CHANGE}, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify no initial updates") + sub.Verify(client.Sync{}) + + t.Logf("Create ACL2") + doSet(t, acl2CreatePb) + + t.Logf("Verify update notifications for ACL2 data") + sub.Verify( + Updated(acl2Path+"/config/name", "TWO"), + Updated(acl2Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/description", "foo"), + ) + + t.Logf("Create ACL1 and delete description of ACL2") + doSet(t, acl1CreatePb, acl2DescDeletePb) + + t.Logf("Verify delete notification for ACL2 description and updates for ACL1 data") + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + Deleted(acl2Path+"/config/description"), + ) + + t.Logf("Delete ACL1 and set description for ACL2") + doSet(t, acl2DescUpdatePb, acl1DeletePb) + + t.Logf("Verify delete for ACL1 and update for ACL2 description") + sub.Verify( + Deleted(acl1Path+"/config"), + Updated(acl2Path+"/config/description", "new"), + ) + }) + + t.Run("ONCHANGE_unsupported", func(t *testing.T) { + t.Logf("Try ON_CHANGE for the top interface list") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/openconfig-interfaces:interfaces/interface[name=*]"), Mode: ON_CHANGE}, + }} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + sampleInterval := 25 * time.Second + + t.Run("SAMPLE", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + t.Logf("Create ACL1") + doSet(t, acl1CreatePb) + + t.Logf("Start SAMPLE subscription for ACL state container.. interval=%v", sampleInterval) + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/acl-set[name=*][type=*]/state"), + SampleInterval: uint64(sampleInterval.Nanoseconds()), + }, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates include ACL1 data only") + sub.Verify( + Updated(acl1Path+"/state/name", "ONE"), + Updated(acl1Path+"/state/type", "ACL_IPV4"), + client.Sync{}, + ) + + t.Logf("Create ACL2") + doSet(t, acl2CreatePb) + + t.Logf("Verify updates include both ACL data, for 3 intervals") + for i := 1; i <= 3; i++ { + t.Logf("interval %d", i) + sub.VerifyT(sampleInterval - 3*time.Second) // check no notifications before the interval + sub.Verify( + Updated(acl1Path+"/state/name", "ONE"), + Updated(acl1Path+"/state/type", "ACL_IPV4"), + Updated(acl2Path+"/state/name", "TWO"), + Updated(acl2Path+"/state/type", "ACL_IPV4"), + Updated(acl2Path+"/state/description", "foo"), + ) + } + + t.Logf("Delete ACL1 and description of ACL2") + doSet(t, acl1DeletePb, acl2DescDeletePb) + + t.Logf("Verify next iteration includes deletes and updates (for remaining ACL2 data)") + sub.VerifyT(sampleInterval - 3*time.Second) + sub.Verify( + Deleted(acl1Path+"/state"), + Deleted(acl2Path+"/state/description"), + Updated(acl2Path+"/state/name", "TWO"), + Updated(acl2Path+"/state/type", "ACL_IPV4"), + ) + + t.Logf("Verify next iteration has updates only") + sub.VerifyT(sampleInterval - 3*time.Second) + sub.Verify( + Updated(acl2Path+"/state/name", "TWO"), + Updated(acl2Path+"/state/type", "ACL_IPV4"), + ) + }) + + t.Run("SAMPLE_suppress_redundant", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + t.Logf("Create ACL1 and ACL2") + doSet(t, acl1CreatePb, acl2CreatePb) + + t.Logf("Start SAMPLE subscription for ACL config container.. interval=%v, suppress_redundant=true", sampleInterval) + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/acl-set[name=*][type=*]/config"), + SampleInterval: uint64(sampleInterval.Nanoseconds()), + SuppressRedundant: true, + }, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates") + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/name", "TWO"), + Updated(acl2Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/description", "foo"), + client.Sync{}, + ) + + t.Logf("Verify next iteration has no data (due to suppress_redundant)") + sub.VerifyT(sampleInterval + 3*time.Second) + + t.Logf("Delete ACL1 and update ACL2 description") + doSet(t, acl1DeletePb, acl2DescUpdatePb) + + t.Logf("Verify next iteration includes deletes and updates for modified paths only") + sub.VerifyT( + sampleInterval+3*time.Second, + Deleted(acl1Path+"/config"), + Updated(acl2Path+"/config/description", "new"), + ) + + t.Logf("Delete ACL2 description") + doSet(t, acl2DescDeletePb) + + t.Logf("Verify next iteration includes description delete only") + sub.VerifyT( + sampleInterval+3*time.Second, + Deleted(acl2Path+"/config/description"), + ) + + t.Logf("Verify next iteration has no data") + sub.VerifyT(sampleInterval + 3*time.Second) + }) + + t.Run("SAMPLE_leaf", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + t.Logf("Create ACL2") + doSet(t, acl2CreatePb) + + t.Logf("Start SAMPLE subscription for ACL description.. interval=%v, updates_only=true", sampleInterval) + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + UpdatesOnly: true, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/acl-set/state/description"), + SampleInterval: uint64(sampleInterval.Nanoseconds()), + }, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify empty initial updates, due to updates_only") + sub.Verify(client.Sync{}) + + t.Logf("Verify next iteration has the description value") + sub.VerifyT(sampleInterval - 3*time.Second) // check no notifications before the interval + sub.Verify( + Updated(acl2Path+"/state/description", "foo"), + ) + + t.Logf("Update ACL2 description") + doSet(t, acl2DescUpdatePb) + + t.Logf("Verify next iteration has the updated description") + sub.VerifyT(sampleInterval - 3*time.Second) + sub.Verify( + Updated(acl2Path+"/state/description", "new"), + ) + + t.Logf("Delete ACL2") + doSet(t, acl2DeletePb) + + t.Logf("Verify next iteration has delete notification") + sub.VerifyT(sampleInterval - 3*time.Second) + sub.Verify( + Deleted(acl2Path + "/state/description"), + ) + + t.Logf("Verify next iteration has no notifications") + sub.VerifyT(sampleInterval + 3*time.Second) + }) + + t.Run("SAMPLE_invalid_interval", func(t *testing.T) { + t.Logf("Try SAMPLE with 1ms SamplerInterval (too low)") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/openconfig-acl:acl/acl-sets"), + SampleInterval: uint64(time.Millisecond.Nanoseconds()), + }, + }} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + t.Run("SAMPLE_no_interval", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + + t.Logf("Start SAMPLE subscription for ACL description.. without setting SampleInterval") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/acl-set/state/description"), + }, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify empty initial updates") + sub.Verify(client.Sync{}) + + t.Logf("Create ACL2") + doSet(t, acl2CreatePb) + + t.Logf("Verify updates are received after default interval") + sub.VerifyT( + (translib.MinSubscribeInterval+2)*time.Second, + Updated(acl2Path+"/state/description", "foo"), + ) + }) + + t.Run("TARGETDEFINED", func(t *testing.T) { + t.Logf("Start TARGETDEFINED subscription for interface description, in-pkts and in-octets") + interval := 30 * time.Second + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/interfaces/interface[name=Ethernet0]"), + Subscription: []*gnmipb.Subscription{ + { + Path: strToPath("/state/description"), + Mode: TARGET_DEFINED, + }, { + Path: strToPath("/state/counters/in-pkts"), + Mode: TARGET_DEFINED, + SampleInterval: uint64(interval.Nanoseconds()), + }, { + Path: strToPath("/state/counters/in-octets"), + Mode: TARGET_DEFINED, + SampleInterval: uint64(interval.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates includes all three data") + eth0Path := "/openconfig-interfaces:interfaces/interface[name=Ethernet0]" + sub.Verify( + Updated(eth0Path+"/state/description", ""), + Updated(eth0Path+"/state/counters/in-pkts", uint64(0)), + Updated(eth0Path+"/state/counters/in-octets", uint64(0)), + client.Sync{}, + ) + + next := time.Now().Add(interval) + + t.Logf("Update port description") + updateDb(t, DbDataMap{ + "CONFIG_DB": {"PORT|Ethernet0": {"description": "the one"}}, + "APPL_DB": {"PORT_TABLE:Ethernet0": {"description": "the one"}}, + }) + + t.Logf("Verify update notification for port description") + sub.Verify( + Updated(eth0Path+"/state/description", "the one"), + ) + + t.Logf("Verify periodic updates for stats only") + for i := 1; i <= 2; i++ { + sub.VerifyT(time.Until(next) - 3*time.Second) + sub.Verify( + Updated(eth0Path+"/state/counters/in-pkts", uint64(0)), + Updated(eth0Path+"/state/counters/in-octets", uint64(0)), + ) + next = time.Now().Add(interval) + } + }) + + t.Run("TARGETDEFINED_split", func(t *testing.T) { + interval := 30 * time.Second + eth0State := "/openconfig-interfaces:interfaces/interface[name=Ethernet0]/state" + + t.Logf("Start TARGETDEFINED subscription for interface state container") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{ + Path: strToPath(eth0State), + Mode: TARGET_DEFINED, + SampleInterval: uint64(interval.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates includes nodes from both state and counters containers") + sub.GlobCompare = true + sub.Verify( + Updated(eth0State+"/counters/*", nil), + Updated(eth0State+"/*", nil), + client.Sync{}, + ) + + t.Logf("Verify next updates contains only counters data") + sub.VerifyT(interval - 2*time.Second) + sub.Verify( + Updated(eth0State+"/counters/*", nil), + ) + }) + + t.Run("hearbeat", func(t *testing.T) { + saInterval := 30 * time.Second + hbInterval := saInterval + 10*time.Second + + t.Logf("Start an ON_CHANGE and SAMPLE subscription with heartbeat %v", hbInterval) + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-interfaces:interfaces/interface[name=Ethernet0]"), + Subscription: []*gnmipb.Subscription{ + { + Path: strToPath("/config/enabled"), + Mode: SAMPLE, + SuppressRedundant: true, + SampleInterval: uint64(saInterval.Nanoseconds()), + HeartbeatInterval: uint64(hbInterval.Nanoseconds()), + }, { + Path: strToPath("/state/oper-status"), + Mode: ON_CHANGE, + HeartbeatInterval: uint64(hbInterval.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates contains both data") + eth0Path := "/openconfig-interfaces:interfaces/interface[name=Ethernet0]" + sub.Verify( + Updated(eth0Path+"/config/enabled", false), + Updated(eth0Path+"/state/oper-status", "DOWN"), + client.Sync{}, + ) + + t.Logf("Verify updates received only after heartbeat interval") + sub.VerifyT(hbInterval - 2*time.Second) + sub.Verify( + Updated(eth0Path+"/config/enabled", false), + Updated(eth0Path+"/state/oper-status", "DOWN"), + ) + }) + + t.Run("hearbeat_invalid (sample)", func(t *testing.T) { + t.Logf("Try a SAMPLE subscription with 1ms heartbeat") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{ + Path: strToPath("/interfaces/interface/config/mtu"), + Mode: SAMPLE, + SuppressRedundant: true, + HeartbeatInterval: uint64(time.Millisecond.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + t.Run("hearbeat_invalid (onchange)", func(t *testing.T) { + t.Logf("Try an ON_CHANGE subscription with 1ms heartbeat") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{ + Path: strToPath("/interfaces/interface/config/mtu"), + Mode: ON_CHANGE, + HeartbeatInterval: uint64(time.Millisecond.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + t.Run("bundle_version_0.0.0", func(t *testing.T) { + t.Logf("Start a subscription with BundleVersion=0.0.0") + req := &gnmipb.SubscribeRequest{ + Request: &gnmipb.SubscribeRequest_Subscribe{ + Subscribe: &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/interfaces/interface[name=Ethernet0]"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/config/mtu"), Mode: ON_CHANGE}, + {Path: strToPath("/state/mtu"), Mode: SAMPLE}, + }}}, + Extension: []*extnpb.Extension{newBundleVersion(t, "0.0.0")}, + } + sub := doSubscribeRaw(t, req, codes.OK) + sub.Verify( + Updated("/openconfig-interfaces:interfaces/interface[name=Ethernet0]/config/mtu", uint64(9100)), + Updated("/openconfig-interfaces:interfaces/interface[name=Ethernet0]/state/mtu", uint64(9100)), + client.Sync{}, + ) + }) + + t.Run("bundle_version_invalid", func(t *testing.T) { + t.Logf("Start POLL subscription with BundleVersion=100.0.0") + req := &gnmipb.SubscribeRequest{ + Request: &gnmipb.SubscribeRequest_Subscribe{ + Subscribe: &gnmipb.SubscriptionList{ + Mode: POLL, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/interfaces/interface[name=Ethernet0]/config/mtu")}, + }}}, + Extension: []*extnpb.Extension{newBundleVersion(t, "100.0.0")}, + } + sub := doSubscribeRaw(t, req, codes.InvalidArgument) + sub.Verify() + }) +} + +func strToPath(s string) *gnmipb.Path { + var origin string + if k := strings.IndexByte(s, ':') + 1; k > 0 && k < len(s) && s[k] == '/' { + origin = s[:k-1] + s = s[k:] + } + p, _ := ygot.StringToStructuredPath(s) + p.Origin = origin + return p +} + +func strToCPath(s string) client.Path { + p := strToPath(s) + return gnmipath.ToStrings(p, false) +} + +func Updated(p string, v interface{}) client.Update { + return client.Update{Path: strToCPath(p), Val: v} +} + +func Deleted(p string) client.Delete { + return client.Delete{Path: strToCPath(p)} +} + +type testSubscriber struct { + t *testing.T + client *client.CacheClient + notiQ *queue.Queue + + GlobCompare bool // treat expected paths as glob patterns in Verify() +} + +func doSubscribe(t *testing.T, subReq *gnmipb.SubscriptionList, exStatus codes.Code) *testSubscriber { + t.Helper() + req := &gnmipb.SubscribeRequest{ + Request: &gnmipb.SubscribeRequest_Subscribe{Subscribe: subReq}} + return doSubscribeRaw(t, req, exStatus) +} + +func doSubscribeRaw(t *testing.T, req *gnmipb.SubscribeRequest, exStatus codes.Code) *testSubscriber { + t.Helper() + q, err := client.NewQuery(req) + if err != nil { + t.Fatalf("NewQuery failed: %v", err) + } + + sub := &testSubscriber{ + t: t, + client: client.New(), + notiQ: queue.New(100), + } + + t.Cleanup(sub.close) + + q.Addrs = []string{"127.0.0.1:8081"} + q.TLS = &tls.Config{InsecureSkipVerify: true} + q.NotificationHandler = func(n client.Notification) error { + //fmt.Printf(">>>> %#v\n", n) + return sub.notiQ.Put(n) + } + + go func() { + err = sub.client.Subscribe(context.Background(), q) + if _, ok := status.FromError(err); !ok || status.Code(err) != exStatus { + msg := fmt.Sprintf("Subscribe failed: expected=%v, received=%v", exStatus, err) + sub.notiQ.Put(client.NewError(msg)) + } else if err != nil { + sub.notiQ.Dispose() // got the expected error.. stop listening immediately + } + }() + + return sub +} + +func (sub *testSubscriber) close() { + if sub != nil { + sub.client.Close() + sub.notiQ.Dispose() + } +} + +func (sub *testSubscriber) Poll() { + if err := sub.client.Poll(); err != nil { + sub.t.Helper() + sub.t.Fatalf("Poll failed: %v", err) + } +} + +func (sub *testSubscriber) Verify(expect ...client.Notification) { + sub.VerifyT(5*time.Second, expect...) +} + +func (sub *testSubscriber) VerifyT(timeout time.Duration, expect ...client.Notification) { + sub.t.Helper() + extra := make([]client.Notification, 0) + matched := make(map[int]client.Notification) + deadine := time.Now().Add(timeout) + + for { + n := sub.nextNoti(deadine) + if n == nil { + break // timeout + } + if err, ok := n.(client.Error); ok { + sub.t.Fatal(err.Error()) + } + + index := -1 + for i, ex := range expect { + if sub.compareNoti(n, ex) { + index = i + break + } + } + if index != -1 { + matched[index] = n + } else { + extra = append(extra, n) + } + if _, ok := n.(client.Sync); ok { + break + } + if !sub.GlobCompare && (len(matched) == len(expect)) { + break + } + } + + // if len(matched) == len(expect) && len(extra) == 0 { + // return + // } + switch { + case len(extra) != 0: // found extra updates + case sub.GlobCompare && len(matched) == 0 && len(expect) != 0: // no glob matches found + case !sub.GlobCompare && len(matched) != len(expect): // wrong number of matches + default: + return + } + + for _, n := range extra { + sub.t.Errorf("unexpected: %#v", n) + } + for i, n := range expect { + if matched[i] == nil { + sub.t.Errorf("missing: %#v", n) + } + } + sub.t.FailNow() +} + +func (sub *testSubscriber) nextNoti(deadline time.Time) client.Notification { + sub.t.Helper() + timeout := time.Until(deadline) + if timeout <= 0 { + return nil + } + n, err := sub.notiQ.Poll(1, timeout) + if err == queue.ErrTimeout || err == queue.ErrDisposed { + return nil + } else if err != nil { + sub.t.Fatalf("Unexpected error while waiting for a notification: %v", err) + } + + switch noti := n[0].(type) { + case client.Update: + noti.TS = time.Time{} + return noti + case client.Delete: + noti.TS = time.Time{} + return noti + case client.Error: + sub.t.Fatalf("Unexpected error notification: %s", noti.Error()) + case client.Connected: + return sub.nextNoti(deadline) + } + + return n[0].(client.Notification) +} + +func (sub *testSubscriber) compareNoti(n, exp client.Notification) bool { + if !sub.GlobCompare { + return reflect.DeepEqual(n, exp) + } + + var path, expPath string + var val, expVal interface{} + switch exp := exp.(type) { + case client.Update: + if u, ok := n.(client.Update); ok { + path, val = pathToString(u.Path), u.Val + expPath, expVal = pathToString(exp.Path), exp.Val + } else { + return false + } + case client.Delete: + if d, ok := n.(client.Delete); ok { + path = pathToString(d.Path) + expPath = pathToString(exp.Path) + } else { + return false + } + default: + return reflect.DeepEqual(n, exp) + } + + if ok, _ := filepath.Match(expPath, path); !ok { + return false + } + return expVal == nil || reflect.DeepEqual(val, expVal) +} + +func doSet(t *testing.T, data ...interface{}) { + t.Helper() + req := &gnmipb.SetRequest{} + for _, v := range data { + switch v := v.(type) { + case *gnmipb.Path: + req.Delete = append(req.Delete, v) + case *gnmipb.Update: + req.Update = append(req.Update, v) + default: + t.Fatalf("Unsupported set value: %T %v", v, v) + } + } + + cred := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) + conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithTransportCredentials(cred)) + if err != nil { + t.Fatalf("Could not create client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + defer conn.Close() + + _, err = gnmipb.NewGNMIClient(conn).Set(ctx, req) + if err != nil { + t.Fatalf("Set failed: %v", err) + } +} + +// DbDataMap is a map[DBNAME]map[KEY]map[FIELD]VALUE +type DbDataMap map[string]map[string]map[string]interface{} + +func updateDb(t *testing.T, data DbDataMap) { + t.Helper() + for dbName, tableData := range data { + n := dbconfig.GetDbId(dbName, dbconfig.GetDbDefaultNamespace()) + redis := getRedisClientN(t, n, dbconfig.GetDbDefaultNamespace()) + defer redis.Close() + for key, fields := range tableData { + if fields == nil { + redis.Del(key) + continue + } + + modFields := make(map[string]interface{}) + delFields := make([]string, 0) + for n, v := range fields { + if v == nil { + delFields = append(delFields, n) + } else { + modFields[n] = v + } + } + + if len(modFields) != 0 { + redis.HMSet(key, modFields) + } + if len(delFields) != 0 { + redis.HDel(key, delFields...) + } + } + } +} + +func newBundleVersion(t *testing.T, version string) *extnpb.Extension { + t.Helper() + v, err := proto.Marshal(&spb.BundleVersion{Version: version}) + if err != nil { + t.Fatalf("Invalid version %s; err=%v", version, err) + } + ext := &extnpb.RegisteredExtension{Id: spb.BUNDLE_VERSION_EXT, Msg: v} + return &extnpb.Extension{Ext: &extnpb.Extension_RegisteredExt{RegisteredExt: ext}} +} diff --git a/go.mod b/go.mod index 8bdccb5b..69c0adb2 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 golang.org/x/net v0.0.0-20201110031124-69a78807bb2b google.golang.org/grpc v1.33.2 + google.golang.org/protobuf v1.25.0 gopkg.in/yaml.v2 v2.2.8 ) diff --git a/proto/sonic_internal.pb.go b/proto/sonic_internal.pb.go index 9d9edbd9..239d7286 100644 --- a/proto/sonic_internal.pb.go +++ b/proto/sonic_internal.pb.go @@ -1,17 +1,32 @@ +// sonic_internal.proto describes the message format used internally by SONiC + // Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.6.1 // source: sonic_internal.proto package gnmi_sonic -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" -import gnmi "github.com/openconfig/gnmi/proto/gnmi" +import ( + proto "github.com/golang/protobuf/proto" + gnmi "github.com/openconfig/gnmi/proto/gnmi" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 type State int32 @@ -21,111 +36,251 @@ const ( State_RUNNING State = 2 ) -var State_name = map[int32]string{ - 0: "STOPPED", - 1: "INIT", - 2: "RUNNING", -} -var State_value = map[string]int32{ - "STOPPED": 0, - "INIT": 1, - "RUNNING": 2, +// Enum value maps for State. +var ( + State_name = map[int32]string{ + 0: "STOPPED", + 1: "INIT", + 2: "RUNNING", + } + State_value = map[string]int32{ + "STOPPED": 0, + "INIT": 1, + "RUNNING": 2, + } +) + +func (x State) Enum() *State { + p := new(State) + *p = x + return p } func (x State) String() string { - return proto.EnumName(State_name, int32(x)) + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (State) Descriptor() protoreflect.EnumDescriptor { + return file_sonic_internal_proto_enumTypes[0].Descriptor() +} + +func (State) Type() protoreflect.EnumType { + return &file_sonic_internal_proto_enumTypes[0] +} + +func (x State) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use State.Descriptor instead. +func (State) EnumDescriptor() ([]byte, []int) { + return file_sonic_internal_proto_rawDescGZIP(), []int{0} } -func (State) EnumDescriptor() ([]byte, []int) { return fileDescriptor1, []int{0} } // Value is the message that reprents a stream of updates for a given path, used internally. type Value struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + // prefix used with path - Prefix *gnmi.Path `protobuf:"bytes,1,opt,name=prefix" json:"prefix,omitempty"` + Prefix *gnmi.Path `protobuf:"bytes,1,opt,name=prefix,proto3" json:"prefix,omitempty"` // The device specific, or path corresponding to a value. - Path *gnmi.Path `protobuf:"bytes,2,opt,name=path" json:"path,omitempty"` + Path *gnmi.Path `protobuf:"bytes,2,opt,name=path,proto3" json:"path,omitempty"` // timestamp for the corresponding value, nanoseconds since epoch. // If timestamp is not set the default will assume to // be the current system time. - Timestamp int64 `protobuf:"varint,3,opt,name=timestamp" json:"timestamp,omitempty"` - Val *gnmi.TypedValue `protobuf:"bytes,4,opt,name=val" json:"val,omitempty"` + Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Val *gnmi.TypedValue `protobuf:"bytes,4,opt,name=val,proto3" json:"val,omitempty"` // Indicate target has sent all values associated with the subscription // at least once. - SyncResponse bool `protobuf:"varint,5,opt,name=sync_response,json=syncResponse" json:"sync_response,omitempty"` + SyncResponse bool `protobuf:"varint,5,opt,name=sync_response,json=syncResponse,proto3" json:"sync_response,omitempty"` // fatal error happened. - Fatal string `protobuf:"bytes,6,opt,name=fatal" json:"fatal,omitempty"` + Fatal string `protobuf:"bytes,6,opt,name=fatal,proto3" json:"fatal,omitempty"` + // Notification to be used in place of 1-4 if present + Notification *gnmi.Notification `protobuf:"bytes,7,opt,name=notification,proto3" json:"notification,omitempty"` } -func (m *Value) Reset() { *m = Value{} } -func (m *Value) String() string { return proto.CompactTextString(m) } -func (*Value) ProtoMessage() {} -func (*Value) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{0} } +func (x *Value) Reset() { + *x = Value{} + if protoimpl.UnsafeEnabled { + mi := &file_sonic_internal_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Value) String() string { + return protoimpl.X.MessageStringOf(x) +} -func (m *Value) GetPrefix() *gnmi.Path { - if m != nil { - return m.Prefix +func (*Value) ProtoMessage() {} + +func (x *Value) ProtoReflect() protoreflect.Message { + mi := &file_sonic_internal_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Value.ProtoReflect.Descriptor instead. +func (*Value) Descriptor() ([]byte, []int) { + return file_sonic_internal_proto_rawDescGZIP(), []int{0} +} + +func (x *Value) GetPrefix() *gnmi.Path { + if x != nil { + return x.Prefix } return nil } -func (m *Value) GetPath() *gnmi.Path { - if m != nil { - return m.Path +func (x *Value) GetPath() *gnmi.Path { + if x != nil { + return x.Path } return nil } -func (m *Value) GetTimestamp() int64 { - if m != nil { - return m.Timestamp +func (x *Value) GetTimestamp() int64 { + if x != nil { + return x.Timestamp } return 0 } -func (m *Value) GetVal() *gnmi.TypedValue { - if m != nil { - return m.Val +func (x *Value) GetVal() *gnmi.TypedValue { + if x != nil { + return x.Val } return nil } -func (m *Value) GetSyncResponse() bool { - if m != nil { - return m.SyncResponse +func (x *Value) GetSyncResponse() bool { + if x != nil { + return x.SyncResponse } return false } -func (m *Value) GetFatal() string { - if m != nil { - return m.Fatal +func (x *Value) GetFatal() string { + if x != nil { + return x.Fatal } return "" } -func init() { - proto.RegisterType((*Value)(nil), "gnmi.sonic.Value") - proto.RegisterEnum("gnmi.sonic.State", State_name, State_value) -} - -func init() { proto.RegisterFile("sonic_internal.proto", fileDescriptor1) } - -var fileDescriptor1 = []byte{ - // 269 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x8f, 0x4f, 0x4b, 0xc3, 0x30, - 0x18, 0xc6, 0xcd, 0xfa, 0xc7, 0xed, 0x9d, 0x42, 0x09, 0x3b, 0x04, 0x11, 0x29, 0xf5, 0x52, 0x14, - 0x5a, 0xd1, 0xaf, 0xa0, 0x48, 0x2f, 0xb5, 0x64, 0xd5, 0xeb, 0xc8, 0x6a, 0xda, 0x06, 0xda, 0x24, - 0xb4, 0x99, 0xb8, 0x6f, 0xe8, 0xc7, 0x92, 0xa6, 0x03, 0x0f, 0xde, 0xf2, 0xfc, 0x9e, 0xdf, 0x03, - 0x79, 0x61, 0x33, 0x2a, 0x29, 0xaa, 0x9d, 0x90, 0x86, 0x0f, 0x92, 0x75, 0x89, 0x1e, 0x94, 0x51, - 0x18, 0x1a, 0xd9, 0x8b, 0xc4, 0x56, 0x57, 0x0f, 0x8d, 0x30, 0xed, 0x61, 0x9f, 0x54, 0xaa, 0x4f, - 0x95, 0xe6, 0xb2, 0x52, 0xb2, 0x16, 0x4d, 0x3a, 0x19, 0xa9, 0xb5, 0xe7, 0xa7, 0x5d, 0xd8, 0x1c, - 0xfd, 0x20, 0xf0, 0x3e, 0x58, 0x77, 0xe0, 0x38, 0x02, 0x5f, 0x0f, 0xbc, 0x16, 0xdf, 0x04, 0x85, - 0x28, 0x5e, 0x3f, 0x42, 0x62, 0xb5, 0x82, 0x99, 0x96, 0x9e, 0x1a, 0x7c, 0x03, 0xae, 0x66, 0xa6, - 0x25, 0x8b, 0x7f, 0x86, 0xe5, 0xf8, 0x1a, 0x56, 0x46, 0xf4, 0x7c, 0x34, 0xac, 0xd7, 0xc4, 0x09, - 0x51, 0xec, 0xd0, 0x3f, 0x80, 0x23, 0x70, 0xbe, 0x58, 0x47, 0x5c, 0x3b, 0x0e, 0xe6, 0x71, 0x79, - 0xd4, 0xfc, 0xd3, 0x7e, 0x80, 0x4e, 0x25, 0xbe, 0x85, 0xcb, 0xf1, 0x28, 0xab, 0xdd, 0xc0, 0x47, - 0xad, 0xe4, 0xc8, 0x89, 0x17, 0xa2, 0x78, 0x49, 0x2f, 0x26, 0x48, 0x4f, 0x0c, 0x6f, 0xc0, 0xab, - 0x99, 0x61, 0x1d, 0xf1, 0x43, 0x14, 0xaf, 0xe8, 0x1c, 0xee, 0xee, 0xc1, 0xdb, 0x1a, 0x66, 0x38, - 0x5e, 0xc3, 0xf9, 0xb6, 0x7c, 0x2b, 0x8a, 0x97, 0xe7, 0xe0, 0x0c, 0x2f, 0xc1, 0xcd, 0xf2, 0xac, - 0x0c, 0xd0, 0x84, 0xe9, 0x7b, 0x9e, 0x67, 0xf9, 0x6b, 0xb0, 0xd8, 0xfb, 0xf6, 0xfc, 0xa7, 0xdf, - 0x00, 0x00, 0x00, 0xff, 0xff, 0x14, 0x1d, 0x16, 0xfb, 0x54, 0x01, 0x00, 0x00, +func (x *Value) GetNotification() *gnmi.Notification { + if x != nil { + return x.Notification + } + return nil +} + +var File_sonic_internal_proto protoreflect.FileDescriptor + +var file_sonic_internal_proto_rawDesc = []byte{ + 0x0a, 0x14, 0x73, 0x6f, 0x6e, 0x69, 0x63, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x73, 0x6f, 0x6e, + 0x69, 0x63, 0x1a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, + 0x70, 0x65, 0x6e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2f, 0x67, 0x6e, 0x6d, 0x69, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6e, 0x6d, 0x69, 0x2f, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x80, 0x02, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x22, + 0x0a, 0x06, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, + 0x2e, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x50, 0x61, 0x74, 0x68, 0x52, 0x06, 0x70, 0x72, 0x65, 0x66, + 0x69, 0x78, 0x12, 0x1e, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x0a, 0x2e, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x50, 0x61, 0x74, 0x68, 0x52, 0x04, 0x70, 0x61, + 0x74, 0x68, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x12, 0x22, 0x0a, 0x03, 0x76, 0x61, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, + 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, + 0x03, 0x76, 0x61, 0x6c, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x72, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x73, 0x79, 0x6e, + 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x61, 0x74, + 0x61, 0x6c, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x66, 0x61, 0x74, 0x61, 0x6c, 0x12, + 0x36, 0x0a, 0x0c, 0x6e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x4e, 0x6f, 0x74, + 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0c, 0x6e, 0x6f, 0x74, 0x69, 0x66, + 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2a, 0x2b, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, + 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x54, 0x4f, 0x50, 0x50, 0x45, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a, + 0x04, 0x49, 0x4e, 0x49, 0x54, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x55, 0x4e, 0x4e, 0x49, + 0x4e, 0x47, 0x10, 0x02, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_sonic_internal_proto_rawDescOnce sync.Once + file_sonic_internal_proto_rawDescData = file_sonic_internal_proto_rawDesc +) + +func file_sonic_internal_proto_rawDescGZIP() []byte { + file_sonic_internal_proto_rawDescOnce.Do(func() { + file_sonic_internal_proto_rawDescData = protoimpl.X.CompressGZIP(file_sonic_internal_proto_rawDescData) + }) + return file_sonic_internal_proto_rawDescData +} + +var file_sonic_internal_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_sonic_internal_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_sonic_internal_proto_goTypes = []interface{}{ + (State)(0), // 0: gnmi.sonic.State + (*Value)(nil), // 1: gnmi.sonic.Value + (*gnmi.Path)(nil), // 2: gnmi.Path + (*gnmi.TypedValue)(nil), // 3: gnmi.TypedValue + (*gnmi.Notification)(nil), // 4: gnmi.Notification +} +var file_sonic_internal_proto_depIdxs = []int32{ + 2, // 0: gnmi.sonic.Value.prefix:type_name -> gnmi.Path + 2, // 1: gnmi.sonic.Value.path:type_name -> gnmi.Path + 3, // 2: gnmi.sonic.Value.val:type_name -> gnmi.TypedValue + 4, // 3: gnmi.sonic.Value.notification:type_name -> gnmi.Notification + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_sonic_internal_proto_init() } +func file_sonic_internal_proto_init() { + if File_sonic_internal_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_sonic_internal_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Value); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_sonic_internal_proto_rawDesc, + NumEnums: 1, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_sonic_internal_proto_goTypes, + DependencyIndexes: file_sonic_internal_proto_depIdxs, + EnumInfos: file_sonic_internal_proto_enumTypes, + MessageInfos: file_sonic_internal_proto_msgTypes, + }.Build() + File_sonic_internal_proto = out.File + file_sonic_internal_proto_rawDesc = nil + file_sonic_internal_proto_goTypes = nil + file_sonic_internal_proto_depIdxs = nil } diff --git a/proto/sonic_internal.proto b/proto/sonic_internal.proto index bb8b3402..6eef071d 100644 --- a/proto/sonic_internal.proto +++ b/proto/sonic_internal.proto @@ -31,4 +31,7 @@ message Value { // fatal error happened. string fatal = 6; + + // Notification to be used in place of 1-4 if present + gnmi.Notification notification = 7; } \ No newline at end of file diff --git a/sonic_data_client/db_client.go b/sonic_data_client/db_client.go index b9174e23..09a52d45 100644 --- a/sonic_data_client/db_client.go +++ b/sonic_data_client/db_client.go @@ -139,6 +139,13 @@ func (val Value) Compare(other queue.Item) int { return -1 } +func (val Value) GetTimestamp() int64 { + if n := val.GetNotification(); n != nil { + return n.GetTimestamp() + } + return val.Value.GetTimestamp() +} + type DbClient struct { prefix *gnmipb.Path pathG2S map[*gnmipb.Path][]tablePath @@ -365,6 +372,12 @@ func ValToResp(val Value) (*gnmipb.SubscribeResponse, error) { return nil, fmt.Errorf("%s", fatal) } + // In case the client returned a full gnmipb.Notification object + if n := val.GetNotification(); n != nil { + return &gnmipb.SubscribeResponse{ + Response: &gnmipb.SubscribeResponse_Update{Update: n}}, nil + } + return &gnmipb.SubscribeResponse{ Response: &gnmipb.SubscribeResponse_Update{ Update: &gnmipb.Notification{ diff --git a/sonic_data_client/transl_data_client.go b/sonic_data_client/transl_data_client.go index b850d0cb..21e63925 100644 --- a/sonic_data_client/transl_data_client.go +++ b/sonic_data_client/transl_data_client.go @@ -1,23 +1,26 @@ -// Package client provides a generic access layer for data available in system +// Package client provides a generic access layer for data available in system package client import ( - spb "github.com/sonic-net/sonic-gnmi/proto" - transutil "github.com/sonic-net/sonic-gnmi/transl_utils" + "context" + "fmt" + "reflect" + "runtime" + "sync" + "time" + + "github.com/Azure/sonic-mgmt-common/translib" + "github.com/Workiva/go-datastructures/queue" log "github.com/golang/glog" "github.com/golang/protobuf/proto" gnmipb "github.com/openconfig/gnmi/proto/gnmi" gnmi_extpb "github.com/openconfig/gnmi/proto/gnmi_ext" - "github.com/Workiva/go-datastructures/queue" - "sync" - "time" - "fmt" - "reflect" - "github.com/Azure/sonic-mgmt-common/translib" + "github.com/openconfig/ygot/ygot" "github.com/sonic-net/sonic-gnmi/common_utils" - "bytes" - "encoding/json" - "context" + spb "github.com/sonic-net/sonic-gnmi/proto" + transutil "github.com/sonic-net/sonic-gnmi/transl_utils" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -33,11 +36,14 @@ type TranslClient struct { channel chan struct{} q *queue.PriorityQueue - synced sync.WaitGroup // Control when to send gNMI sync_response - w *sync.WaitGroup // wait for all sub go routines to finish - mu sync.RWMutex // Mutex for data protection among routines for transl_client - ctx context.Context //Contains Auth info and request info + synced sync.WaitGroup // Control when to send gNMI sync_response + w *sync.WaitGroup // wait for all sub go routines to finish + mu sync.RWMutex // Mutex for data protection among routines for transl_client + ctx context.Context //Contains Auth info and request info extensions []*gnmi_extpb.Extension + + version *translib.Version // Client version; populated by parseVersion() + encoding gnmipb.Encoding } func NewTranslClient(prefix *gnmipb.Path, getpaths []*gnmipb.Path, ctx context.Context, extensions []*gnmi_extpb.Extension, opts ...TranslClientOption) (Client, error) { @@ -127,6 +133,7 @@ func (c *TranslClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, upda } return nil } + func enqueFatalMsgTranslib(c *TranslClient, msg string) { c.q.Put(Value{ &spb.Value{ @@ -135,375 +142,309 @@ func enqueFatalMsgTranslib(c *TranslClient, msg string) { }, }) } -type ticker_info struct{ - t *time.Ticker - sub *gnmipb.Subscription - heartbeat bool + +func enqueueSyncMessage(c *TranslClient) { + m := &spb.Value{ + Timestamp: time.Now().UnixNano(), + SyncResponse: true, + } + c.q.Put(Value{m}) +} + +// recoverSubscribe recovers from possible panics during subscribe handling. +// It pushes a fatal message to the RPC handler's queue, which forces the server to +// close the RPC with an error status. Should always be used as a deferred function. +func recoverSubscribe(c *TranslClient) { + if r := recover(); r != nil { + buff := make([]byte, 1<<12) + buff = buff[:runtime.Stack(buff, false)] + log.Error(string(buff)) + + err := status.Errorf(codes.Internal, "%v", r) + enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error =%v", err.Error())) + } +} + +type ticker_info struct { + t *time.Ticker + sub *gnmipb.Subscription + pathStr string + heartbeat bool +} + +func getTranslNotificationType(mode gnmipb.SubscriptionMode) translib.NotificationType { + switch mode { + case gnmipb.SubscriptionMode_ON_CHANGE: + return translib.OnChange + case gnmipb.SubscriptionMode_SAMPLE: + return translib.Sample + default: + return translib.TargetDefined + } +} + +func tickerCleanup(ticker_map map[int][]*ticker_info, c *TranslClient) { + for _, v := range ticker_map { + for _, ti := range v { + ti.t.Stop() + } + } } func (c *TranslClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { - rc, ctx := common_utils.GetContext(c.ctx) - c.ctx = ctx c.w = w defer c.w.Done() + defer recoverSubscribe(c) + c.q = q c.channel = stop - version := getBundleVersion(c.extensions) - if version != nil { - rc.BundleVersion = version + c.encoding = subscribe.Encoding + + if err := c.parseVersion(); err != nil { + enqueFatalMsgTranslib(c, err.Error()) + return } ticker_map := make(map[int][]*ticker_info) + + defer tickerCleanup(ticker_map, c) var cases []reflect.SelectCase cases_map := make(map[int]int) var subscribe_mode gnmipb.SubscriptionMode - stringPaths := make([]string, len(subscribe.Subscription)) - for i,sub := range subscribe.Subscription { - stringPaths[i] = c.path2URI[sub.Path] + translPaths := make([]translib.IsSubscribePath, len(subscribe.Subscription)) + sampleCache := make(map[string]*ygotCache) + + for i, sub := range subscribe.Subscription { + translPaths[i].ID = uint32(i) + translPaths[i].Path = c.path2URI[sub.Path] + translPaths[i].Mode = getTranslNotificationType(sub.Mode) + } + + rc, _ := common_utils.GetContext(c.ctx) + ss := translib.NewSubscribeSession() + defer ss.Close() + + req := translib.IsSubscribeRequest{ + Paths: translPaths, + Session: ss, + User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles}, + } + if c.version != nil { + req.ClientVersion = *c.version } - req := translib.IsSubscribeRequest{Paths:stringPaths} - subSupport,_ := translib.IsSubscribeSupported(req) + + subSupport, err := translib.IsSubscribeSupported(req) + if err != nil { + enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error =%v", err.Error())) + return + } + var onChangeSubsString []string - var onChangeSubsgNMI []*gnmipb.Path - onChangeMap := make(map[string]*gnmipb.Path) - valueCache := make(map[string]string) - for i,sub := range subscribe.Subscription { - fmt.Println(sub.Mode, sub.SampleInterval) - switch sub.Mode { + for i, pInfo := range subSupport { + sub := subscribe.Subscription[pInfo.ID] + log.V(6).Infof("Start Sub: %v", sub) + pathStr := pInfo.Path + switch sub.Mode { case gnmipb.SubscriptionMode_TARGET_DEFINED: - - if subSupport[i].Err == nil && subSupport[i].IsOnChangeSupported { - if subSupport[i].PreferredType == translib.Sample { - subscribe_mode = gnmipb.SubscriptionMode_SAMPLE - } else if subSupport[i].PreferredType == translib.OnChange { - subscribe_mode = gnmipb.SubscriptionMode_ON_CHANGE - } + if pInfo.IsOnChangeSupported && pInfo.PreferredType == translib.OnChange { + subscribe_mode = gnmipb.SubscriptionMode_ON_CHANGE } else { subscribe_mode = gnmipb.SubscriptionMode_SAMPLE } - case gnmipb.SubscriptionMode_ON_CHANGE: - if subSupport[i].Err == nil && subSupport[i].IsOnChangeSupported { - if (subSupport[i].MinInterval > 0) { - subscribe_mode = gnmipb.SubscriptionMode_ON_CHANGE - }else{ - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid subscribe path %v", stringPaths[i])) - return - } + if pInfo.IsOnChangeSupported { + subscribe_mode = gnmipb.SubscriptionMode_ON_CHANGE } else { - enqueFatalMsgTranslib(c, fmt.Sprintf("ON_CHANGE Streaming mode invalid for %v", stringPaths[i])) + enqueFatalMsgTranslib(c, fmt.Sprintf("ON_CHANGE Streaming mode invalid for %v", pathStr)) return } case gnmipb.SubscriptionMode_SAMPLE: - if (subSupport[i].MinInterval > 0) { - subscribe_mode = gnmipb.SubscriptionMode_SAMPLE - }else{ - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid subscribe path %v", stringPaths[i])) - return - } + subscribe_mode = gnmipb.SubscriptionMode_SAMPLE default: - log.V(1).Infof("Bad Subscription Mode for client %v ", c) enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Subscription Mode %d", sub.Mode)) return } - fmt.Println("subscribe_mode:", subscribe_mode) + + if pInfo.MinInterval <= 0 { // should not happen + pInfo.MinInterval = translib.MinSubscribeInterval + } + + if hb := sub.HeartbeatInterval; hb > 0 && hb < uint64(pInfo.MinInterval)*uint64(time.Second) { + enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Heartbeat Interval %ds, minimum interval is %ds", + sub.HeartbeatInterval/uint64(time.Second), subSupport[i].MinInterval)) + return + } + + log.V(6).Infof("subscribe_mode %v for path %s", subscribe_mode, pathStr) if subscribe_mode == gnmipb.SubscriptionMode_SAMPLE { interval := int(sub.SampleInterval) + minInterval := pInfo.MinInterval * int(time.Second) if interval == 0 { - interval = subSupport[i].MinInterval * int(time.Second) - } else { - if interval < (subSupport[i].MinInterval*int(time.Second)) { - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Sample Interval %ds, minimum interval is %ds", interval/int(time.Second), subSupport[i].MinInterval)) - return - } + interval = minInterval + } else if interval < minInterval { + enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid SampleInterval %ds, minimum interval is %ds", interval/int(time.Second), pInfo.MinInterval)) + return } - if !subscribe.UpdatesOnly { - //Send initial data now so we can send sync response. - val, err := transutil.TranslProcessGet(c.path2URI[sub.Path], nil, c.ctx) - if err != nil { - return - } - spbv := &spb.Value{ - Prefix: c.prefix, - Path: sub.Path, - Timestamp: time.Now().UnixNano(), - SyncResponse: false, - Val: val, - } - c.q.Put(Value{spbv}) - valueCache[c.path2URI[sub.Path]] = string(val.GetJsonIetfVal()) + + reqPath, _ := ygot.StringToStructuredPath(pathStr) + yCache := newYgotCache(reqPath) + sampleCache[pathStr] = yCache + ts := translSubscriber{ + client: c, + session: ss, + sampleCache: yCache, + filterMsgs: subscribe.UpdatesOnly, + } + + // Force ignore init updates for subpaths to prevent duplicates. + // But performs duplicate gets though -- needs optimization. + if pInfo.IsSubPath { + ts.filterMsgs = true } - addTimer(c, ticker_map, &cases, cases_map, interval, sub, false) + // do initial sync & build the cache + ts.doSample(pathStr) + addTimer(c, ticker_map, &cases, cases_map, interval, sub, pathStr, false) //Heartbeat intervals are valid for SAMPLE in the case suppress_redundant is specified if sub.SuppressRedundant && sub.HeartbeatInterval > 0 { - if int(sub.HeartbeatInterval) < subSupport[i].MinInterval * int(time.Second) { - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Heartbeat Interval %ds, minimum interval is %ds", int(sub.HeartbeatInterval)/int(time.Second), subSupport[i].MinInterval)) - return - } - addTimer(c, ticker_map, &cases, cases_map, int(sub.HeartbeatInterval), sub, true) + addTimer(c, ticker_map, &cases, cases_map, int(sub.HeartbeatInterval), sub, pathStr, true) } } else if subscribe_mode == gnmipb.SubscriptionMode_ON_CHANGE { - onChangeSubsString = append(onChangeSubsString, c.path2URI[sub.Path]) - onChangeSubsgNMI = append(onChangeSubsgNMI, sub.Path) - onChangeMap[c.path2URI[sub.Path]] = sub.Path + onChangeSubsString = append(onChangeSubsString, pathStr) if sub.HeartbeatInterval > 0 { - if int(sub.HeartbeatInterval) < subSupport[i].MinInterval * int(time.Second) { - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Heartbeat Interval %ds, minimum interval is %ds", int(sub.HeartbeatInterval)/int(time.Second), subSupport[i].MinInterval)) - return - } - addTimer(c, ticker_map, &cases, cases_map, int(sub.HeartbeatInterval), sub, true) + addTimer(c, ticker_map, &cases, cases_map, int(sub.HeartbeatInterval), sub, pathStr, true) } - } + log.V(6).Infof("End Sub: %v", sub) } - if len(onChangeSubsString) > 0 { - c.w.Add(1) - c.synced.Add(1) - go TranslSubscribe(onChangeSubsgNMI, onChangeSubsString, onChangeMap, c, subscribe.UpdatesOnly) + if len(onChangeSubsString) > 0 { + ts := translSubscriber{ + client: c, + session: ss, + filterMsgs: subscribe.UpdatesOnly, + } + ts.doOnChange(onChangeSubsString) + } else { + // If at least one ON_CHANGE subscription was present, then + // ts.doOnChange() would have sent the sync message. + // Explicitly send one here if all are SAMPLE subscriptions. + enqueueSyncMessage(c) } - // Wait until all data values corresponding to the path(s) specified - // in the SubscriptionList has been transmitted at least once - c.synced.Wait() - spbs := &spb.Value{ - Timestamp: time.Now().UnixNano(), - SyncResponse: true, - } - c.q.Put(Value{spbs}) + cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.channel)}) for { chosen, _, ok := reflect.Select(cases) - - if !ok { return } - for _,tick := range ticker_map[cases_map[chosen]] { - fmt.Printf("tick, heartbeat: %t, path: %s", tick.heartbeat, c.path2URI[tick.sub.Path]) - val, err := transutil.TranslProcessGet(c.path2URI[tick.sub.Path], nil, c.ctx) - if err != nil { - return - } - spbv := &spb.Value{ - Prefix: c.prefix, - Path: tick.sub.Path, - Timestamp: time.Now().UnixNano(), - SyncResponse: false, - Val: val, - } - - - if (tick.sub.SuppressRedundant) && (!tick.heartbeat) && (string(val.GetJsonIetfVal()) == valueCache[c.path2URI[tick.sub.Path]]) { - log.V(6).Infof("Redundant Message Suppressed #%v", string(val.GetJsonIetfVal())) - } else { - c.q.Put(Value{spbv}) - valueCache[c.path2URI[tick.sub.Path]] = string(val.GetJsonIetfVal()) - log.V(6).Infof("Added spbv #%v", spbv) + for _, tick := range ticker_map[cases_map[chosen]] { + log.V(6).Infof("tick, heartbeat: %t, path: %s\n", tick.heartbeat, c.path2URI[tick.sub.Path]) + ts := translSubscriber{ + client: c, + session: ss, + sampleCache: sampleCache[tick.pathStr], + filterDups: (!tick.heartbeat && tick.sub.SuppressRedundant), } - - + ts.doSample(tick.pathStr) } } } -func addTimer(c *TranslClient, ticker_map map[int][]*ticker_info, cases *[]reflect.SelectCase, cases_map map[int]int, interval int, sub *gnmipb.Subscription, heartbeat bool) { +func addTimer(c *TranslClient, ticker_map map[int][]*ticker_info, cases *[]reflect.SelectCase, cases_map map[int]int, interval int, sub *gnmipb.Subscription, pathStr string, heartbeat bool) { //Reuse ticker for same sample intervals, otherwise create a new one. if ticker_map[interval] == nil { ticker_map[interval] = make([]*ticker_info, 1, 1) - ticker_map[interval][0] = &ticker_info { - t: time.NewTicker(time.Duration(interval) * time.Nanosecond), - sub: sub, + ticker_map[interval][0] = &ticker_info{ + t: time.NewTicker(time.Duration(interval) * time.Nanosecond), + sub: sub, + pathStr: pathStr, heartbeat: heartbeat, } cases_map[len(*cases)] = interval *cases = append(*cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ticker_map[interval][0].t.C)}) - }else { - ticker_map[interval] = append(ticker_map[interval], &ticker_info { - t: ticker_map[interval][0].t, - sub: sub, + } else { + ticker_map[interval] = append(ticker_map[interval], &ticker_info{ + t: ticker_map[interval][0].t, + sub: sub, + pathStr: pathStr, heartbeat: heartbeat, }) } - - -} - -func TranslSubscribe(gnmiPaths []*gnmipb.Path, stringPaths []string, pathMap map[string]*gnmipb.Path, c *TranslClient, updates_only bool) { - defer c.w.Done() - rc, ctx := common_utils.GetContext(c.ctx) - c.ctx = ctx - q := queue.NewPriorityQueue(1, false) - var sync_done bool - req := translib.SubscribeRequest{Paths:stringPaths, Q:q, Stop:c.channel} - if rc.BundleVersion != nil { - nver, err := translib.NewVersion(*rc.BundleVersion) - if err != nil { - log.V(2).Infof("Subscribe operation failed with error =%v", err.Error()) - enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error =%v", err.Error())) - return - } - req.ClientVersion = nver - } - translib.Subscribe(req) - for { - items, err := q.Get(1) - if err != nil { - log.V(1).Infof("%v", err) - return - } - switch v := items[0].(type) { - case *translib.SubscribeResponse: - - if v.IsTerminated { - //DB Connection or other backend error - enqueFatalMsgTranslib(c, "DB Connection Error") - close(c.channel) - return - } - - var jv []byte - dst := new(bytes.Buffer) - json.Compact(dst, v.Payload) - jv = dst.Bytes() - - /* Fill the values into GNMI data structures . */ - val := &gnmipb.TypedValue{ - Value: &gnmipb.TypedValue_JsonIetfVal{ - JsonIetfVal: jv, - }} - - spbv := &spb.Value{ - Prefix: c.prefix, - Path: pathMap[v.Path], - Timestamp: v.Timestamp, - SyncResponse: false, - Val: val, - } - - //Don't send initial update with full object if user wants updates only. - if updates_only && !sync_done { - log.V(1).Infof("Msg suppressed due to updates_only") - } else { - c.q.Put(Value{spbv}) - } - - log.V(6).Infof("Added spbv #%v", spbv) - - if v.SyncComplete && !sync_done { - fmt.Println("SENDING SYNC") - c.synced.Done() - sync_done = true - } - default: - log.V(1).Infof("Unknown data type %v for %v in queue", items[0], v) - } - } } - - func (c *TranslClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { - rc, ctx := common_utils.GetContext(c.ctx) - c.ctx = ctx c.w = w defer c.w.Done() + defer recoverSubscribe(c) c.q = q c.channel = poll - version := getBundleVersion(c.extensions) - if version != nil { - rc.BundleVersion = version + c.encoding = subscribe.Encoding + + if err := c.parseVersion(); err != nil { + enqueFatalMsgTranslib(c, err.Error()) + return } + synced := false for { _, more := <-c.channel if !more { log.V(1).Infof("%v poll channel closed, exiting pollDb routine", c) + enqueFatalMsgTranslib(c, "") return } + t1 := time.Now() - for gnmiPath, URIPath := range c.path2URI { + for _, gnmiPath := range c.path2URI { if synced || !subscribe.UpdatesOnly { - val, err := transutil.TranslProcessGet(URIPath, nil, c.ctx) - if err != nil { - return - } - - spbv := &spb.Value{ - Prefix: c.prefix, - Path: gnmiPath, - Timestamp: time.Now().UnixNano(), - SyncResponse: false, - Val: val, - } - - c.q.Put(Value{spbv}) - log.V(6).Infof("Added spbv #%v", spbv) + ts := translSubscriber{client: c} + ts.doSample(gnmiPath) } } - c.q.Put(Value{ - &spb.Value{ - Timestamp: time.Now().UnixNano(), - SyncResponse: true, - }, - }) + enqueueSyncMessage(c) synced = true log.V(4).Infof("Sync done, poll time taken: %v ms", int64(time.Since(t1)/time.Millisecond)) } } + func (c *TranslClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { - rc, ctx := common_utils.GetContext(c.ctx) - c.ctx = ctx c.w = w defer c.w.Done() + defer recoverSubscribe(c) + c.q = q c.channel = once + c.encoding = subscribe.Encoding - version := getBundleVersion(c.extensions) - if version != nil { - rc.BundleVersion = version + if err := c.parseVersion(); err != nil { + enqueFatalMsgTranslib(c, err.Error()) + return } + _, more := <-c.channel if !more { log.V(1).Infof("%v once channel closed, exiting onceDb routine", c) + enqueFatalMsgTranslib(c, "") return } - t1 := time.Now() - for gnmiPath, URIPath := range c.path2URI { - val, err := transutil.TranslProcessGet(URIPath, nil, c.ctx) - if err != nil { - return - } - - if !subscribe.UpdatesOnly && val != nil { - spbv := &spb.Value{ - Prefix: c.prefix, - Path: gnmiPath, - Timestamp: time.Now().UnixNano(), - SyncResponse: false, - Val: val, - } - c.q.Put(Value{spbv}) - log.V(6).Infof("Added spbv #%v", spbv) - } + t1 := time.Now() + for _, gnmiPath := range c.path2URI { + ts := translSubscriber{client: c} + ts.doSample(gnmiPath) } - c.q.Put(Value{ - &spb.Value{ - Timestamp: time.Now().UnixNano(), - SyncResponse: true, - }, - }) + enqueueSyncMessage(c) log.V(4).Infof("Sync done, once time taken: %v ms", int64(time.Since(t1)/time.Millisecond)) - + } func (c *TranslClient) Capabilities() []gnmipb.ModelData { @@ -527,22 +468,35 @@ func (c *TranslClient) SentOne(val *Value) { func (c *TranslClient) FailedSend() { } - func getBundleVersion(extensions []*gnmi_extpb.Extension) *string { - for _,e := range extensions { + for _, e := range extensions { switch v := e.Ext.(type) { - case *gnmi_extpb.Extension_RegisteredExt: - if v.RegisteredExt.Id == spb.BUNDLE_VERSION_EXT { - var bv spb.BundleVersion - proto.Unmarshal(v.RegisteredExt.Msg, &bv) - return &bv.Version - } - + case *gnmi_extpb.Extension_RegisteredExt: + if v.RegisteredExt.Id == spb.BUNDLE_VERSION_EXT { + var bv spb.BundleVersion + proto.Unmarshal(v.RegisteredExt.Msg, &bv) + return &bv.Version + } + } } return nil } +func (c *TranslClient) parseVersion() error { + bv := getBundleVersion(c.extensions) + if bv == nil { + return nil + } + v, err := translib.NewVersion(*bv) + if err != nil { + c.version = &v + return nil + } + log.V(4).Infof("Failed to parse version \"%s\"; err=%v", *bv, err) + return fmt.Errorf("Invalid bundle version: %v", *bv) +} + type TranslClientOption interface { IsTranslClientOption() } diff --git a/sonic_data_client/transl_subscriber.go b/sonic_data_client/transl_subscriber.go new file mode 100644 index 00000000..2b11658b --- /dev/null +++ b/sonic_data_client/transl_subscriber.go @@ -0,0 +1,386 @@ +//////////////////////////////////////////////////////////////////////////////// +// // +// Copyright 2021 Broadcom. The term Broadcom refers to Broadcom Inc. and/or // +// its subsidiaries. // +// // +// Licensed under the Apache License, Version 2.0 (the "License"); // +// you may not use this file except in compliance with the License. // +// You may obtain a copy of the License at // +// // +// http://www.apache.org/licenses/LICENSE-2.0 // +// // +// Unless required by applicable law or agreed to in writing, software // +// distributed under the License is distributed on an "AS IS" BASIS, // +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // +// See the License for the specific language governing permissions and // +// limitations under the License. // +// // +//////////////////////////////////////////////////////////////////////////////// + +package client + +import ( + "fmt" + "sync" + "time" + + "github.com/Azure/sonic-mgmt-common/translib" + "github.com/Workiva/go-datastructures/queue" + log "github.com/golang/glog" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" + "github.com/openconfig/ygot/ygot" + spb "github.com/sonic-net/sonic-gnmi/proto" + "github.com/sonic-net/sonic-gnmi/transl_utils" +) + +// translSubscriber is an extension of TranslClient to service Subscribe RPC. +type translSubscriber struct { + client *TranslClient + session *translib.SubscribeSession + sampleCache *ygotCache // session cache for SAMPLE; optional + filterMsgs bool // Filter out messages till sync done (updates_only) + filterDups bool // Filter out duplicate updates (suppress_redundant) + stopOnSync bool // Stop upon sync message from translib + synced sync.WaitGroup // To signal receipt of sync message from translib + rcvdPaths map[string]bool // Paths from received SubscribeResponse + msgBuilder notificationBuilder +} + +// notificationBuilder creates gnmipb.Notification from a translib.SubscribeResponse +// instance. Input can be nil, indicating the end of current sample iteration. +type notificationBuilder func( + resp *translib.SubscribeResponse, ts *translSubscriber) (*gnmipb.Notification, error) + +// doSample invokes translib.Stream API to service SAMPLE, POLL and ONCE subscriptions. +// Timer for SAMPLE subscription should be managed outside. +func (ts *translSubscriber) doSample(path string) { + if ts.sampleCache != nil { + ts.msgBuilder = ts.sampleCache.msgBuilder // SAMPLE + ts.rcvdPaths = make(map[string]bool) + } else { + ts.msgBuilder = defaultMsgBuilder // ONCE, POLL or heartbeat for ON_CHANGE + } + + c := ts.client + req := translib.SubscribeRequest{ + Paths: []string{path}, + Q: queue.NewPriorityQueue(1, false), + Session: ts.session, + } + if c.version != nil { + req.ClientVersion = *c.version + } + + c.w.Add(1) + ts.synced.Add(1) + ts.stopOnSync = true + go ts.processResponses(req.Q) + + err := translib.Stream(req) + if err != nil { + req.Q.Dispose() + enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error = %v", err)) + } + + ts.synced.Wait() +} + +// doOnChange handles the ON_CHANGE subscriptions through translib.Subscribe API. +// Returns only after initial updates and sync message are sent to the RPC queue. +func (ts *translSubscriber) doOnChange(stringPaths []string) { + c := ts.client + q := queue.NewPriorityQueue(1, false) + + req := translib.SubscribeRequest{ + Paths: stringPaths, + Q: q, + Stop: c.channel, + Session: ts.session, + } + if c.version != nil { + req.ClientVersion = *c.version + } + + c.w.Add(1) + ts.synced.Add(1) + ts.msgBuilder = defaultMsgBuilder + go ts.processResponses(q) + + err := translib.Subscribe(req) + if err != nil { + q.Dispose() + enqueFatalMsgTranslib(c, "Subscribe operation failed with error: "+err.Error()) + } + + ts.synced.Wait() +} + +// processResponses waits for SubscribeResponse messages from translib over a +// queue, formats them as spb.Value and pushes to the RPC queue. +func (ts *translSubscriber) processResponses(q *queue.PriorityQueue) { + c := ts.client + var syncDone bool + defer c.w.Done() + defer func() { + if !syncDone { + ts.synced.Done() + } + }() + defer recoverSubscribe(c) + + for { + items, err := q.Get(1) + if err == queue.ErrDisposed { + log.V(3).Info("PriorityQueue was disposed!") + return + } + if err != nil { + enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error =%v", err.Error())) + return + } + switch v := items[0].(type) { + case *translib.SubscribeResponse: + + if v.IsTerminated { + //DB Connection or other backend error + enqueFatalMsgTranslib(c, "DB Connection Error") + close(c.channel) + return + } + + if v.SyncComplete { + if ts.stopOnSync { + ts.notify(nil) + log.V(6).Infof("Stopping on sync signal from translib") + return + } + + log.V(6).Infof("SENDING SYNC") + enqueueSyncMessage(c) + syncDone = true + ts.synced.Done() + ts.filterMsgs = false + break + } + + if err := ts.notify(v); err != nil { + log.Warning(err) + enqueFatalMsgTranslib(c, "Internal error") + return + } + default: + log.V(1).Infof("Unknown data type %T in queue", v) + } + } +} + +func (ts *translSubscriber) notify(v *translib.SubscribeResponse) error { + msg, err := ts.msgBuilder(v, ts) + if err != nil { + return err + } + + if msg == nil || (len(msg.Update) == 0 && len(msg.Delete) == 0) { + log.V(6).Infof("Ignore nil message") + return nil + } + + spbv := &spb.Value{Notification: msg} + ts.client.q.Put(Value{spbv}) + log.V(6).Infof("Added spbv %#v", spbv) + return nil +} + +func (ts *translSubscriber) toPrefix(path string) *gnmipb.Path { + p, _ := ygot.StringToStructuredPath(path) + p.Target = ts.client.prefix.GetTarget() + p.Origin = ts.client.prefix.GetOrigin() + return p +} + +func defaultMsgBuilder(v *translib.SubscribeResponse, ts *translSubscriber) (*gnmipb.Notification, error) { + if v == nil { + return nil, nil + } + if ts.filterMsgs { + log.V(2).Infof("Msg suppressed due to updates_only") + return nil, nil + } + + p := ts.toPrefix(v.Path) + n := gnmipb.Notification{ + Prefix: p, + Timestamp: v.Timestamp, + } + + // Move last elem of v.Path to updates & deletes if v.Delete contains + // an empty path (to indicate the v.Path itself was deleted). + var extraPrefix *gnmipb.PathElem + if strSliceContains(v.Delete, "") { + extraPrefix = removeLastPathElem(p) + } + + if v.Update != nil { + var err error + n.Update, err = ts.ygotToScalarValues(extraPrefix, v.Update) + if err != nil { + return nil, err + } + } + + for _, del := range v.Delete { + p, err := ygot.StringToStructuredPath(del) + if err != nil { + return nil, err + } + insertFirstPathElem(p, extraPrefix) + n.Delete = append(n.Delete, p) + } + + return &n, nil +} + +// ygotToScalarValues returns scalar encoded values for a ygot object. +// If prefixElem is provided, it will be prefixed to each value's path. +func (ts *translSubscriber) ygotToScalarValues(prefixElem *gnmipb.PathElem, obj ygot.ValidatedGoStruct) ([]*gnmipb.Update, error) { + tmp, err := ygot.TogNMINotifications(obj, 0, + ygot.GNMINotificationsConfig{ + UsePathElem: true, + PathElemPrefix: nil, + Encoding: ts.client.encoding, + }) + if err != nil { + return nil, err + } + + updates := tmp[0].Update + if prefixElem != nil { + for _, u := range updates { + insertFirstPathElem(u.Path, prefixElem) + } + } + + return updates, nil +} + +// insertFirstPathElem inserts newElem at the beginning of path p. +func insertFirstPathElem(p *gnmipb.Path, newElem *gnmipb.PathElem) { + if newElem != nil { + ne := make([]*gnmipb.PathElem, 0, len(p.Elem)+1) + ne = append(ne, newElem) + p.Elem = append(ne, p.Elem...) + } +} + +// removeLastPathElem removes the last PathElem from the path p. +// Returns the removed element. +func removeLastPathElem(p *gnmipb.Path) *gnmipb.PathElem { + k := len(p.Elem) - 1 + if k < 0 { + return nil + } + if p.Element != nil { + p.Element = p.Element[:k] + } + last := p.Elem[k] + p.Elem = p.Elem[:k] + return last +} + +func strSliceContains(ss []string, v string) bool { + for _, s := range ss { + if s == v { + return true + } + } + return false +} + +// ygotCache holds path to ygot struct mappings +type ygotCache struct { + values map[string]ygot.GoStruct + pattern *gnmipb.Path // Prefix pattern for the cache keys +} + +// newYgotCache creates a new ygotCache instance +func newYgotCache(pattern *gnmipb.Path) *ygotCache { + return &ygotCache{ + values: make(map[string]ygot.GoStruct), + pattern: pattern, + } +} + +// msgBuilder is a notificationBuilder implementation to create a gnmipb.Notification +// message by comparing the SubscribeResponse.Update ygot struct to the cached value. +// Includes only modified or deleted leaf paths if translSubscriber.filterDups is set. +// Returns nil message if translSubscriber.filterMsgs is set or on error. +// Updates the cache with the new ygot struct (SubscribeResponse.Update). +// Special case: if SubscribeResponse is nil, calls deleteMsgBuilder to delete +// non-existing paths from the cache. +func (c *ygotCache) msgBuilder(v *translib.SubscribeResponse, ts *translSubscriber) (*gnmipb.Notification, error) { + if v == nil { + return c.deleteMsgBuilder(ts) + } + + old := c.values[v.Path] + c.values[v.Path] = v.Update + ts.rcvdPaths[v.Path] = true + log.V(2).Infof("%s updated; old=%p, new=%p, filterDups=%v", v.Path, old, v.Update, ts.filterDups) + if ts.filterMsgs { + log.V(2).Infof("Msg suppressed due to updates_only") + return nil, nil + } + + res, err := transl_utils.Diff(old, v.Update, + transl_utils.DiffOptions{ + RecordAll: !ts.filterDups, + }) + if err != nil { + return nil, err + } + + return &gnmipb.Notification{ + Timestamp: v.Timestamp, + Prefix: ts.toPrefix(v.Path), + Update: res.Update, + Delete: res.Delete, + }, nil +} + +// deleteMsgBuilder deletes the cache entries whose path does not appear in +// the translSubscriber.rcvdPaths map. Creates a gnmipb.Notification message +// for the deleted paths. Returns nil message if there are no such delete paths +// or translSubscriber.filterMsgs is set. +func (c *ygotCache) deleteMsgBuilder(ts *translSubscriber) (*gnmipb.Notification, error) { + if ts.filterMsgs { + log.V(2).Infof("Msg suppressed due to updates_only") + return nil, nil + } + var deletePaths []*gnmipb.Path + for path := range c.values { + if !ts.rcvdPaths[path] { + log.V(3).Infof("%s deleted", path) + deletePaths = append(deletePaths, c.toDeletePath(path)) + delete(c.values, path) + } + } + if len(deletePaths) == 0 { + return nil, nil + } + return &gnmipb.Notification{ + Timestamp: time.Now().UnixNano(), + Prefix: ts.toPrefix("/"), + Delete: deletePaths, + }, nil +} + +func (c *ygotCache) toDeletePath(path string) *gnmipb.Path { + p, _ := ygot.StringToStructuredPath(path) + // p will be parent container path when subscribing to a leaf. + // Append the leaf suffix to p if p is shorter than subscribe path. + if n := len(p.Elem); n < len(c.pattern.Elem) { + suffix := c.pattern.Elem[n:] + p.Elem = append(p.Elem, suffix...) + } + return p +}