diff --git a/gnmi_server/server_test.go b/gnmi_server/server_test.go index eb3a5d4f..cca0966f 100644 --- a/gnmi_server/server_test.go +++ b/gnmi_server/server_test.go @@ -709,6 +709,7 @@ type subscriptionQuery struct { Query []string SubMode pb.SubscriptionMode SampleInterval uint64 + HeartbeatInterval uint64 } func pathToString(q client.Path) string { @@ -778,7 +779,7 @@ func createEventsQuery(t *testing.T, paths ...string) client.Query { false) } -func createStateDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query { +func createStateDbQueryOnChangeMode(t *testing.T, interval time.Duration, paths ...string) client.Query { return createQueryOrFail(t, pb.SubscriptionList_STREAM, "STATE_DB", @@ -786,6 +787,7 @@ func createStateDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query { Query: paths, SubMode: pb.SubscriptionMode_ON_CHANGE, + HeartbeatInterval: uint64(interval.Nanoseconds()), }, }, false) @@ -3133,12 +3135,19 @@ func TestTableKeyOnDeletion(t *testing.T) { tests := []struct { desc string q client.Query + wantSubErr error wantNoti []client.Notification paths []string }{ + { + desc: "Testing invalid heartbeat interval", + q: createStateDbQueryOnChangeMode(t, 10 * time.Second, "NEIGH_STATE_TABLE"), + wantSubErr: fmt.Errorf("rpc error: code = InvalidArgument desc = invalid heartbeat interval: 10s. It cannot be less than %v", sdc.MinHeartbeatInterval), + wantNoti: []client.Notification{}, + }, { desc: "Testing deletion of NEIGH_STATE_TABLE:10.0.0.57", - q: createStateDbQueryOnChangeMode(t, "NEIGH_STATE_TABLE"), + q: createStateDbQueryOnChangeMode(t, 2 * time.Minute, "NEIGH_STATE_TABLE"), wantNoti: []client.Notification { client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableJson}, client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableDeletedJson57}, @@ -3149,7 +3158,7 @@ func TestTableKeyOnDeletion(t *testing.T) { }, { desc: "Testing deletion of NEIGH_STATE_TABLE:10.0.0.59 and NEIGH_STATE_TABLE 10.0.0.61", - q: createStateDbQueryOnChangeMode(t, "NEIGH_STATE_TABLE"), + q: createStateDbQueryOnChangeMode(t, 2 * time.Minute, "NEIGH_STATE_TABLE"), wantNoti: []client.Notification { client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableJsonTwo}, client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableDeletedJson59}, diff --git a/sonic_data_client/db_client.go b/sonic_data_client/db_client.go index 3b73b50f..e625d87f 100644 --- a/sonic_data_client/db_client.go +++ b/sonic_data_client/db_client.go @@ -73,6 +73,10 @@ var Target2RedisDb = make(map[string]map[string]*redis.Client) // Any non-zero value that less than this threshold is considered invalid argument. var MinSampleInterval = time.Second +// MinHeartbeatInterval is the lowest HB interval for streaming subscriptions. +// This is reserved value, which should be adjusted per BGPL benchmark result. +var MinHeartbeatInterval = 1 * time.Minute + // IntervalTicker is a factory method to implement interval ticking. // Exposed for UT purposes. var IntervalTicker = func(interval time.Duration) <-chan time.Time { @@ -212,7 +216,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync for gnmiPath := range c.pathG2S { c.w.Add(1) c.synced.Add(1) - go streamOnChangeSubscription(c, gnmiPath) + go streamOnChangeSubscription(c, gnmiPath, nil) } } else { log.V(2).Infof("Stream subscription request received, mode: %v, subscription count: %v", @@ -230,7 +234,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync } else if subMode == gnmipb.SubscriptionMode_ON_CHANGE { c.w.Add(1) c.synced.Add(1) - go streamOnChangeSubscription(c, sub.GetPath()) + go streamOnChangeSubscription(c, nil, sub) } else { enqueueFatalMsg(c, fmt.Sprintf("unsupported subscription mode, %v", subMode)) return @@ -255,19 +259,36 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync } // streamOnChangeSubscription implements Subscription "ON_CHANGE STREAM" mode -func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path) { +func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path, sub *gnmipb.Subscription) { + if gnmiPath == nil { + gnmiPath = sub.GetPath() + } + + // if heartbeatInterval is not assigned, use 0 to ignore periodical full sync + var heartbeatInterval time.Duration = 0 + if sub != nil { + var err error + heartbeatInterval, err = validateHeartbeatInterval(sub) + if err != nil { + enqueueFatalMsg(c, err.Error()) + c.synced.Done() + c.w.Done() + return + } + } + tblPaths := c.pathG2S[gnmiPath] log.V(2).Infof("streamOnChangeSubscription gnmiPath: %v", gnmiPath) if tblPaths[0].field != "" { if len(tblPaths) > 1 { - go dbFieldMultiSubscribe(c, gnmiPath, true, time.Millisecond*200, false) + go dbFieldMultiSubscribe(c, gnmiPath, true, heartbeatInterval, false) } else { - go dbFieldSubscribe(c, gnmiPath, true, time.Millisecond*200) + go dbFieldSubscribe(c, gnmiPath, true, heartbeatInterval) } } else { // sample interval and update only parameters are not applicable - go dbTableKeySubscribe(c, gnmiPath, 0, true) + go dbTableKeySubscribe(c, gnmiPath, heartbeatInterval, true) } } @@ -1340,3 +1361,17 @@ func validateSampleInterval(sub *gnmipb.Subscription) (time.Duration, error) { return requestedInterval, nil } } + +// validateHeartbeatInterval validates the heartbeat interval of the given subscription. +func validateHeartbeatInterval(sub *gnmipb.Subscription) (time.Duration, error) { + requestedInterval := time.Duration(sub.GetHeartbeatInterval()) + if requestedInterval == 0 { + // If the heartbeat_interval is set to 0, the target MUST create the subscription + // and send the data with the MinHeartbeatInterval + return MinHeartbeatInterval, nil + } else if requestedInterval < MinHeartbeatInterval { + return 0, fmt.Errorf("invalid heartbeat interval: %v. It cannot be less than %v", requestedInterval, MinHeartbeatInterval) + } else { + return requestedInterval, nil + } +}