Skip to content

Commit

Permalink
feat: add --latest flag and guidance errors for out of range values
Browse files Browse the repository at this point in the history
  • Loading branch information
Robin Bryce committed Nov 12, 2024
1 parent 7006015 commit 185be4a
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 59 deletions.
7 changes: 7 additions & 0 deletions tests/systemtest/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ testVeracityWatchPublicFindsActivity() {
assertContains "watch-public should find activity" "$output" "$PROD_PUBLIC_TENANT_ID"
}

testVeracityWatchLatestFindsActivity() {
local output
output=$($VERACITY_INSTALL --data-url $DATATRAILS_URL/verifiabledata --tenant=$PROD_PUBLIC_TENANT_ID watch --latest)
assertEquals "watch-public --latest should return a 0 exit code" 0 $?
assertContains "watch-public --latest should find activity" "$output" "$PROD_PUBLIC_TENANT_ID"
}

testVeracityReplicateLogsPublicTenantWatchPipe() {
local output

Expand Down
19 changes: 7 additions & 12 deletions tests/watch/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ func (s *WatchCmdSuite) TestErrorForNegativeHorizon() {
"--horizon", "-1h",
})
s.ErrorContains(err, "negative horizon")
// alternative approach which just makes the error more readable
// s.ErrorContains(err, "is to large or otherwise out of range")
}

func (s *WatchCmdSuite) TestNoErrorVeryLargeHorizon() {
func (s *WatchCmdSuite) TestErrorGuidanceForVeryLargeHorizon() {

app := veracity.NewApp("version", false)
veracity.AddCommands(app, false)
Expand All @@ -32,14 +30,12 @@ func (s *WatchCmdSuite) TestNoErrorVeryLargeHorizon() {
"--data-url", s.Env.VerifiableDataURL,
"watch",
"--horizon", "1000000000h",
//"--horizon", "100000h", // 11 years, so we are sure we look back far enough to find an event
})
s.NoError(err)
// alternative approach which just makes the error more readable
// s.ErrorContains(err, "is out of range or otherwise invalid")
s.ErrorContains(err, "--horizon=max")
s.ErrorContains(err, "--latest")
}

func (s *WatchCmdSuite) TestNoErrorLargeButParsableHorizon() {
func (s *WatchCmdSuite) TestErrorGuidanceForLargeButParsableHorizon() {

app := veracity.NewApp("version", false)
veracity.AddCommands(app, false)
Expand All @@ -50,9 +46,8 @@ func (s *WatchCmdSuite) TestNoErrorLargeButParsableHorizon() {
"watch",
"--horizon", "1000000h", // over flows the id timestamp epoch
})
s.NoError(err)
// alternative approach which just makes the error more readable
// s.ErrorContains(err, "is out of range or otherwise invalid")
s.ErrorContains(err, "--horizon=max")
s.ErrorContains(err, "--latest")
}

func (s *WatchCmdSuite) TestNoErrorOrNoChanges() {
Expand All @@ -79,7 +74,7 @@ func (s *WatchCmdSuite) TestNoChangesForFictitiousTenant() {
"veracity",
"--data-url", s.Env.VerifiableDataURL,
"--tenant", s.Env.UnknownTenantId,
"watch",
"watch", "--latest",
})
assert.Equal(err, veracity.ErrNoChanges)
}
Expand Down
144 changes: 98 additions & 46 deletions watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,25 @@ import (
)

const (
flagCount = "count"
flagHorizon = "horizon"
flagIDSince = "idsince"
flagInterval = "interval"
flagLatest = "latest"
flagSince = "since"

currentEpoch = uint8(1) // good until the end of the first unix epoch
tenantPrefix = "tenant/"
sealIDNotFound = "NOT-FOUND"
// maxPollCount is the maximum number of times to poll for *some* activity.
// Polling always terminates as soon as the first activity is detected.
maxPollCount = 15
// More than this over flows the epoch which is half the length of the unix time epoch
maxHorizon = time.Hour * 100000
maxHorizon = time.Hour * 100000
horizonAliasMax = "max" // short hand for the largest supported duration
sinceAliasLatest = "latest" // short hand for obtaining the latest change for all watched tenants
rangeDurationParseErrorSubString = "time: invalid duration "
threeSeconds = 3 * time.Second
)

var (
Expand All @@ -42,6 +53,7 @@ type WatchConfig struct {
WatchTenants map[string]bool
WatchCount int
ReaderURL string
Latest bool
}

// watchReporter abstracts the output interface for WatchForChanges to facilitate unit testing.
Expand Down Expand Up @@ -74,28 +86,34 @@ func NewLogWatcherCmd() *cli.Command {
horizon is always inferred from the since arguments if they are provided
`,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: flagLatest,
Usage: `find the latest changes for each requested tenant (no matter how long ago they occured). This is mutualy exclusive with --since, --idsince and --horizon.`,
Value: false,
},

&cli.TimestampFlag{
Name: "since",
Name: flagSince,
Usage: "RFC 3339 time stamp, only logs with changes after this are considered, defaults to now. idsince takes precendence if also supplied.",
Layout: time.RFC3339,
},
&cli.StringFlag{
Name: "idsince", Aliases: []string{"s"},
Name: flagIDSince, Aliases: []string{"s"},
Usage: "Start time as an idtimestamp. Start time defaults to now. All results are >= this hex string. If provided, it is used exactly as is. Takes precedence over since",
},
&cli.StringFlag{
Name: "horizon",
Name: flagHorizon,
Aliases: []string{"z"},
Value: "24h",
Usage: "Infer since as now - horizon. The format is {number}{units} eg 1h to only see things in the last hour. If watching (count=0), since is re-calculated every interval",
Usage: "Infer since as now - horizon. Use the alias --horizon=max to force the highest supported value. Otherwise, the format is {number}{units} eg 1h to only see things in the last hour. If watching (count=0), since is re-calculated every interval",
},
&cli.DurationFlag{
Name: "interval", Aliases: []string{"d"},
Value: 3 * time.Second,
Name: flagInterval, Aliases: []string{"d"},
Value: threeSeconds,
Usage: "The default polling interval is once every three seconds, setting the interval to zero disables polling",
},
&cli.IntFlag{
Name: "count", Usage: fmt.Sprintf(
Name: flagCount, Usage: fmt.Sprintf(
"Number of intervals to poll. Polling is terminated once the first activity is seen or after %d attempts regardless", maxPollCount),
Value: 1,
},
Expand All @@ -111,7 +129,7 @@ func NewLogWatcherCmd() *cli.Command {
}
reporter := &defaultReporter{log: cmd.log}

cfg, err := NewWatchConfig(cCtx, cmd, reporter)
cfg, err := NewWatchConfig(cCtx, cmd)
if err != nil {
return err
}
Expand All @@ -128,30 +146,46 @@ func NewLogWatcherCmd() *cli.Command {
}
}

func checkCompatibleFlags(cCtx cliContext) error {
if !cCtx.IsSet(flagLatest) {
return nil
}

latestExcludes := []string{flagHorizon, flagSince, flagIDSince}

for _, excluded := range latestExcludes {
if cCtx.IsSet(excluded) {
return fmt.Errorf("the %s flag is mutualy exclusive with %s", flagLatest, strings.Join(latestExcludes, ", "))
}
}
return nil
}

type cliContext interface {
IsSet(string) bool
Bool(string) bool
Duration(name string) time.Duration
Timestamp(name string) *time.Time
String(name string) string
Int(name string) int
}

const (
rangeDurationParseErrorSubString = "time: invalid duration "
)

// parseHorizon parses a duration string from the command line In accordance
// with the most common reason for parse failure (specifying a large number), On
// an error that looks like a range to large issue, we coerce to the maximum
// hours and ignore the error. Errors that don't contain the marker substring
// are returned as is.
func parseHorizon(horizon string, reporter watchReporter) (time.Duration, error) {
func parseHorizon(horizon string) (time.Duration, error) {

if horizon == horizonAliasMax {
return maxHorizon, nil
}

d, err := time.ParseDuration(horizon)
if err == nil {

// clamp the horizon, otherwise it may overflow the idtimestamp epoch
if d > maxHorizon {
reporter.Logf("clamped duration from %s to %v", horizon, maxHorizon)
return maxHorizon, nil
return maxHorizon, fmt.Errorf("the maximum supported duration is --horizon=%v, which has the alias --horizon=max. also consider using --latest", maxHorizon)
}
if d < 0 {
return 0, fmt.Errorf("negative horizon value:%s", horizon)
Expand All @@ -160,53 +194,53 @@ func parseHorizon(horizon string, reporter watchReporter) (time.Duration, error)
return d, nil
}

// Note: it is a matter of opinion whether we should error here or not.
// Since finding many tenants is only a performance issue, we simply
// force the maximum range of hours on an error that indicates a range
// issue. By far the most common use for a large value is "just give me everything"
// The substring was obtained by code inspection of the time.ParseDuration implementation
if strings.HasPrefix(err.Error(), rangeDurationParseErrorSubString) {
reporter.Logf("clamped duration from %s to %v", horizon, maxHorizon)
return maxHorizon, nil
return maxHorizon, fmt.Errorf("the supplied horizon was invalid. the maximum supported duration is --horizon=%v, which has the alias --horizon=max. also consider using --latest", maxHorizon)
}

// Alternative which accurately reports the error but is likely just as inconvenient
// if strings.HasPrefix(err.Error(), rangeDurationParseErrorSubString) {
// return WatchConfig{}, fmt.Errorf("the horizon '%s' is to large or otherwise out of range", horizon)
// }

return d, fmt.Errorf("the horizon '%s' is out of range or otherwise invalid", horizon)
return d, fmt.Errorf("the horizon '%s' is out of range or otherwise invalid. Use --horizon=max to get the largest supported value %v. also consider using --latest", horizon, maxHorizon)
}

// NewWatchConfig derives a configuration from the options set on the command line context
func NewWatchConfig(cCtx cliContext, cmd *CmdCtx, reporter watchReporter) (WatchConfig, error) {
func NewWatchConfig(cCtx cliContext, cmd *CmdCtx) (WatchConfig, error) {

var err error

cfg := WatchConfig{}
cfg.Interval = cCtx.Duration("interval")
// --latest is mutualy exclusive with the horizon, since, idsince flags.
if err = checkCompatibleFlags(cCtx); err != nil {
return WatchConfig{}, err
}

cfg := WatchConfig{
Latest: cCtx.Bool(flagLatest),
}
cfg.Interval = cCtx.Duration(flagInterval)

horizon := cCtx.String("horizon")
if horizon != "" {
cfg.Horizon, err = parseHorizon(horizon, reporter)
if cCtx.IsSet(flagHorizon) {
cfg.Horizon, err = parseHorizon(cCtx.String(flagHorizon))
if err != nil {
return WatchConfig{}, err
}
}
if cCtx.Timestamp("since") != nil {
cfg.Since = *cCtx.Timestamp("since")
}
cfg.IDSince = cCtx.String("idsince")

err = watcher.ConfigDefaults(&cfg.WatchConfig)
if err != nil {
return WatchConfig{}, err
if cCtx.IsSet(flagSince) {
cfg.Since = *cCtx.Timestamp(flagSince)
}
if cfg.Interval < time.Second {
return WatchConfig{}, fmt.Errorf("polling more than once per second is not currently supported")
if cCtx.IsSet(flagIDSince) {
cfg.IDSince = cCtx.String(flagIDSince)
}

if !cCtx.IsSet(flagLatest) {
err = watcher.ConfigDefaults(&cfg.WatchConfig)
if err != nil {
return WatchConfig{}, err
}
if cfg.Interval < time.Second {
return WatchConfig{}, fmt.Errorf("polling more than once per second is not currently supported")
}
}

cfg.WatchCount = min(max(1, cCtx.Int("count")), maxPollCount)
cfg.WatchCount = min(max(1, cCtx.Int(flagCount)), maxPollCount)

cfg.ReaderURL = cmd.readerURL

Expand All @@ -230,6 +264,24 @@ type Watcher struct {
collator watcher.LogTailCollator
}

// FirstFilter accounts for the --latest flag but otherwise falls through to the base implementation
func (w *Watcher) FirstFilter() string {
if !w.cfg.Latest {
return w.Watcher.FirstFilter()
}
// The first idtimestamp of the first epoch
idSince := massifs.IDTimestampToHex(0, 0)
return fmt.Sprintf(`"lastid">='%s'`, idSince)
}

// NextFilter accounts for the --latest flag but otherwise falls through to the base implementation
func (w *Watcher) NextFilter() string {
if !w.cfg.Latest {
return w.Watcher.NextFilter()
}
return w.FirstFilter()
}

func normalizeTenantIdentity(tenant string) string {
if strings.HasPrefix(tenant, tenantPrefix) {
return tenant
Expand Down
Loading

0 comments on commit 185be4a

Please sign in to comment.