From 6607e0564d2f8d66ad304a4f1e1909e346a492d4 Mon Sep 17 00:00:00 2001 From: Antoine Grondin Date: Mon, 28 Oct 2024 00:43:09 +0900 Subject: [PATCH] fix(localhost): watching an unbounded query now waits for any future data --- cmd/humanlog/account.go | 22 +- cmd/humanlog/auth.go | 8 +- cmd/humanlog/ingest.go | 10 +- cmd/humanlog/localhost.go | 8 +- cmd/humanlog/machine.go | 6 +- cmd/humanlog/main.go | 26 +-- cmd/humanlog/onboarding.go | 4 +- cmd/humanlog/organization.go | 22 +- cmd/humanlog/query.go | 69 +++--- cmd/humanlog/versions.go | 6 +- go.mod | 3 + go.sum | 4 +- internal/localstorage/memory.go | 195 +++++++++++++++-- internal/localstorage/memory_test.go | 37 +++- internal/localstorage/queryable.go | 3 +- internal/localsvc/svc.go | 134 ++++++++++-- internal/localsvc/svc_test.go | 4 +- pkg/retry/retry_test.go | 34 --- pkg/sink/bufsink/sized.go | 2 +- pkg/sink/logsvcsink/bidistream_sink.go | 4 +- pkg/sink/logsvcsink/stream_sink.go | 4 +- pkg/sink/logsvcsink/unary_sink.go | 4 +- pkg/sink/sink.go | 4 +- pkg/sink/stdiosink/stdio.go | 2 +- pkg/sink/teesink/tee.go | 14 +- .../humanlogio/api/go/types/v1/logquery.pb.go | 37 ++-- vendor/github.com/teivah/broadcast/.gitignore | 3 + vendor/github.com/teivah/broadcast/LICENSE | 201 ++++++++++++++++++ vendor/github.com/teivah/broadcast/README.md | 101 +++++++++ .../github.com/teivah/broadcast/broadcast.go | 123 +++++++++++ vendor/modules.txt | 6 +- 31 files changed, 905 insertions(+), 195 deletions(-) create mode 100644 vendor/github.com/teivah/broadcast/.gitignore create mode 100644 vendor/github.com/teivah/broadcast/LICENSE create mode 100644 vendor/github.com/teivah/broadcast/README.md create mode 100644 vendor/github.com/teivah/broadcast/broadcast.go diff --git a/cmd/humanlog/account.go b/cmd/humanlog/account.go index 53aae024..9737b72d 100644 --- a/cmd/humanlog/account.go +++ b/cmd/humanlog/account.go @@ -31,7 +31,7 @@ func accountCmd( getState func(cctx *cli.Context) *state.State, getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource, getAPIUrl func(cctx *cli.Context) string, - getHTTPClient func(*cli.Context) *http.Client, + getHTTPClient func(cctx *cli.Context, apiURL string) *http.Client, ) cli.Command { return cli.Command{ Hidden: hideUnreleasedFeatures == "true", @@ -42,7 +42,7 @@ func accountCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) _, err := ensureLoggedIn(ctx, cctx, state, tokenSource, apiURL, httpClient) if err != nil { return err @@ -58,7 +58,7 @@ func accountCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) accountName := cctx.Args().First() if accountName == "" { @@ -85,7 +85,7 @@ func accountCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) orgID, err := ensureOrgSelected(ctx, ll, cctx, state, tokenSource, apiURL, httpClient) if err != nil { @@ -124,7 +124,7 @@ func accountCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) orgID, err := ensureOrgSelected(ctx, ll, cctx, state, tokenSource, apiURL, httpClient) if err != nil { @@ -157,7 +157,7 @@ func accountCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) accountName := cctx.Args().First() if accountName == "" { @@ -198,7 +198,7 @@ func accountCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) clOpts := connect.WithInterceptors( auth.Interceptors(ll, tokenSource)..., ) @@ -232,7 +232,7 @@ func accountCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) orgID, err := ensureOrgSelected(ctx, ll, cctx, state, tokenSource, apiURL, httpClient) if err != nil { @@ -283,7 +283,7 @@ func accountCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) orgID, err := ensureOrgSelected(ctx, ll, cctx, state, tokenSource, apiURL, httpClient) if err != nil { @@ -334,7 +334,7 @@ func accountCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) orgID, err := ensureOrgSelected(ctx, ll, cctx, state, tokenSource, apiURL, httpClient) if err != nil { @@ -396,7 +396,7 @@ func accountCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) orgID, err := ensureOrgSelected(ctx, ll, cctx, state, tokenSource, apiURL, httpClient) if err != nil { diff --git a/cmd/humanlog/auth.go b/cmd/humanlog/auth.go index 81b08068..2779c29c 100644 --- a/cmd/humanlog/auth.go +++ b/cmd/humanlog/auth.go @@ -29,7 +29,7 @@ func authCmd( getState func(cctx *cli.Context) *state.State, getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource, getAPIUrl func(cctx *cli.Context) string, - getHTTPClient func(cctx *cli.Context) *http.Client, + getHTTPClient func(cctx *cli.Context, apiURL string) *http.Client, ) cli.Command { return cli.Command{ Name: authCmdName, @@ -42,7 +42,7 @@ func authCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) authClient := authv1connect.NewAuthServiceClient(httpClient, apiURL) _, err := performLoginFlow(ctx, state, authClient, tokenSource) return err @@ -54,7 +54,7 @@ func authCmd( ctx := getCtx(cctx) apiURL := getAPIUrl(cctx) state := getState(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) tokenSource := getTokenSource(cctx) userToken, err := ensureLoggedIn(ctx, cctx, state, tokenSource, apiURL, httpClient) if err != nil { @@ -89,7 +89,7 @@ func authCmd( Action: func(cctx *cli.Context) error { ctx := getCtx(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) tokenSource := getTokenSource(cctx) ll := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{})) diff --git a/cmd/humanlog/ingest.go b/cmd/humanlog/ingest.go index 86ba39d2..c4f218c0 100644 --- a/cmd/humanlog/ingest.go +++ b/cmd/humanlog/ingest.go @@ -27,20 +27,20 @@ func ingest( ctx context.Context, ll *slog.Logger, cctx *cli.Context, - apiAddr string, + apiURL string, getCfg func(*cli.Context) *config.Config, getState func(*cli.Context) *state.State, getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource, - getHTTPClient func(*cli.Context) *http.Client, + getHTTPClient func(*cli.Context, string) *http.Client, notifyUnableToIngest func(error), ) (sink.Sink, error) { state := getState(cctx) tokenSource := getTokenSource(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) if state.IngestionToken == nil || time.Now().After(state.IngestionToken.ExpiresAt.AsTime()) { // we need to create an account token - accountToken, err := createIngestionToken(ctx, ll, cctx, state, tokenSource, apiAddr, httpClient) + accountToken, err := createIngestionToken(ctx, ll, cctx, state, tokenSource, apiURL, httpClient) if err != nil { return nil, fmt.Errorf("no ingestion token configured, and couldn't generate one: %v", err) } @@ -62,7 +62,7 @@ func ingest( connect.WithGRPC(), } - client := ingestv1connect.NewIngestServiceClient(httpClient, apiAddr, clOpts...) + client := ingestv1connect.NewIngestServiceClient(httpClient, apiURL, clOpts...) var snk sink.Sink switch sinkType := os.Getenv("HUMANLOG_SINK_TYPE"); sinkType { case "unary": diff --git a/cmd/humanlog/localhost.go b/cmd/humanlog/localhost.go index f591a3a3..9dfe0f0a 100644 --- a/cmd/humanlog/localhost.go +++ b/cmd/humanlog/localhost.go @@ -86,10 +86,10 @@ func startLocalhostServer( localhostSink := logsvcsink.StartStreamSink(ctx, ll, client, "local", machineID, 1<<20, 100*time.Millisecond, true, notifyUnableToIngest) return localhostSink, func(ctx context.Context) error { logdebug("flushing localhost sink") - return localhostSink.Flush(ctx) + return localhostSink.Close(ctx) }, nil } - storage := localstorage.NewMemStorage() + storage := localstorage.NewMemStorage(ll.WithGroup("memstorage")) ownSink, _, err := storage.SinkFor(int64(machineID), time.Now().UnixNano()) if err != nil { return nil, nil, fmt.Errorf("can't create own sink: %v", err) @@ -127,11 +127,11 @@ func startLocalhostServer( wg.Add(1) go func() { defer wg.Done() - errc <- ownSink.Flush(ctx) + errc <- ownSink.Close(ctx) }() wg.Wait() close(errc) - l.Close() + _ = l.Close() var ferr error for err := range errc { if ferr == nil { diff --git a/cmd/humanlog/machine.go b/cmd/humanlog/machine.go index 8f52bf0a..d85e3624 100644 --- a/cmd/humanlog/machine.go +++ b/cmd/humanlog/machine.go @@ -23,7 +23,7 @@ func machineCmd( getState func(cctx *cli.Context) *state.State, getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource, getAPIUrl func(cctx *cli.Context) string, - getHTTPClient func(*cli.Context) *http.Client, + getHTTPClient func(cctx *cli.Context, apiURL string) *http.Client, ) cli.Command { return cli.Command{ Hidden: hideUnreleasedFeatures == "true", @@ -34,7 +34,7 @@ func machineCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) _, err := ensureLoggedIn(ctx, cctx, state, tokenSource, apiURL, httpClient) if err != nil { return err @@ -51,7 +51,7 @@ func machineCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) accountToken, err := createIngestionToken(ctx, ll, cctx, state, tokenSource, apiURL, httpClient) if err != nil { return fmt.Errorf("ingestion token couldn't be generated: %v", err) diff --git a/cmd/humanlog/main.go b/cmd/humanlog/main.go index 7b42ecfe..b8ed3093 100644 --- a/cmd/humanlog/main.go +++ b/cmd/humanlog/main.go @@ -293,7 +293,7 @@ func newApp() *cli.App { logdebug("using basesite at %q", baseSiteURL) return baseSiteURL } - getHTTPClient = func(cctx *cli.Context) *http.Client { + getHTTPClient = func(cctx *cli.Context, apiURL string) *http.Client { u, _ := url.Parse(apiURL) if host, _, _ := net.SplitHostPort(u.Host); host == "localhost" { getLogger(cctx).Debug("using localhost client") @@ -507,7 +507,7 @@ func newApp() *cli.App { ctx, cancel := context.WithTimeout(context.Background(), flushTimeout) defer cancel() ll.DebugContext(ctx, "flushing remote ingestion sink for up to 300ms") - if err := remotesink.Flush(ctx); err != nil { + if err := remotesink.Close(ctx); err != nil { ll.ErrorContext(ctx, "couldn't flush buffered log", slog.Any("err", err)) } else { ll.DebugContext(ctx, "done sending all logs") @@ -528,7 +528,7 @@ func newApp() *cli.App { // no machine ID assigned, ensure machine gets onboarded via the login flow // TODO(antoine): if an account token exists, auto-onboard the machine. it's probably // not an interactive session - _, err := ensureLoggedIn(ctx, cctx, state, getTokenSource(cctx), apiURL, getHTTPClient(cctx)) + _, err := ensureLoggedIn(ctx, cctx, state, getTokenSource(cctx), apiURL, getHTTPClient(cctx, apiURL)) if err != nil { return fmt.Errorf("this feature requires a valid machine ID, which requires an account. failed to login: %v", err) } @@ -539,17 +539,17 @@ func newApp() *cli.App { loginfo("starting experimental localhost service: %v", err) } else { sink = teesink.NewTeeSink(sink, localhostSink) + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) + defer cancel() + ll.DebugContext(ctx, "flushing localhost ingestion sink for up to 300ms") + if err := done(ctx); err != nil { + ll.ErrorContext(ctx, "couldn't flush buffered log (localhost)", slog.Any("err", err)) + } else { + ll.DebugContext(ctx, "done sending all logs") + } + }() } - defer func() { - ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) - defer cancel() - ll.DebugContext(ctx, "flushing localhost ingestion sink for up to 300ms") - if err := done(ctx); err != nil { - ll.ErrorContext(ctx, "couldn't flush buffered log (localhost)", slog.Any("err", err)) - } else { - ll.DebugContext(ctx, "done sending all logs") - } - }() } } diff --git a/cmd/humanlog/onboarding.go b/cmd/humanlog/onboarding.go index 7ae3e8f2..adb9949e 100644 --- a/cmd/humanlog/onboarding.go +++ b/cmd/humanlog/onboarding.go @@ -23,7 +23,7 @@ func onboardingCmd( getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource, getAPIUrl func(cctx *cli.Context) string, getBaseSiteURL func(cctx *cli.Context) string, - getHTTPClient func(*cli.Context) *http.Client, + getHTTPClient func(*cli.Context, string) *http.Client, ) cli.Command { return cli.Command{ Name: onboardingCmdName, @@ -47,7 +47,7 @@ func onboardingCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) authClient := authv1connect.NewAuthServiceClient(httpClient, apiURL) _, err := performLoginFlow(ctx, state, authClient, tokenSource) return err diff --git a/cmd/humanlog/organization.go b/cmd/humanlog/organization.go index dbc701d8..c337d367 100644 --- a/cmd/humanlog/organization.go +++ b/cmd/humanlog/organization.go @@ -29,7 +29,7 @@ func organizationCmd( getState func(cctx *cli.Context) *state.State, getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource, getAPIUrl func(cctx *cli.Context) string, - getHTTPClient func(*cli.Context) *http.Client, + getHTTPClient func(cctx *cli.Context, apiURL string) *http.Client, ) cli.Command { var ( @@ -49,7 +49,7 @@ func organizationCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) _, err := ensureLoggedIn(ctx, cctx, state, tokenSource, apiURL, httpClient) if err != nil { return err @@ -67,7 +67,7 @@ func organizationCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) organizationName := cctx.Args().First() if organizationName == "" { @@ -106,7 +106,7 @@ func organizationCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) if state.CurrentOrgID == nil { return fmt.Errorf("no org is currently set") @@ -146,7 +146,7 @@ func organizationCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) clOpts := connect.WithInterceptors( auth.Interceptors(ll, tokenSource)..., @@ -169,7 +169,7 @@ func organizationCmd( ll := getLogger(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) clOpts := connect.WithInterceptors( auth.Interceptors(ll, tokenSource)..., @@ -211,7 +211,7 @@ func organizationCmd( ll := getLogger(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) organizationName := cctx.Args().First() if organizationName == "" { @@ -247,7 +247,7 @@ func organizationCmd( ll := getLogger(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) clOpts := connect.WithInterceptors( auth.Interceptors(ll, tokenSource)..., @@ -277,7 +277,7 @@ func organizationCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) orgID, err := ensureOrgSelected(ctx, ll, cctx, state, tokenSource, apiURL, httpClient) if err != nil { @@ -310,7 +310,7 @@ func organizationCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) _ = ctx _ = state @@ -335,7 +335,7 @@ func organizationCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) _ = ctx _ = state diff --git a/cmd/humanlog/query.go b/cmd/humanlog/query.go index c3c77526..98b43979 100644 --- a/cmd/humanlog/query.go +++ b/cmd/humanlog/query.go @@ -43,7 +43,7 @@ func queryCmd( getState func(cctx *cli.Context) *state.State, getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource, getAPIUrl func(cctx *cli.Context) string, - getHTTPClient func(*cli.Context) *http.Client, + getHTTPClient func(cctx *cli.Context, apiURL string) *http.Client, ) cli.Command { return cli.Command{ Hidden: hideUnreleasedFeatures == "true", @@ -81,7 +81,7 @@ func queryCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) _, err := ensureLoggedIn(ctx, cctx, state, tokenSource, apiURL, httpClient) if err != nil { return err @@ -118,8 +118,9 @@ func queryApiSummarizeCmd( getState func(cctx *cli.Context) *state.State, getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource, getAPIUrl func(cctx *cli.Context) string, - getHTTPClient func(*cli.Context) *http.Client, + getHTTPClient func(cctx *cli.Context, apiURL string) *http.Client, ) cli.Command { + bucket := cli.IntFlag{Name: "buckets", Value: 20} fromFlag := cli.DurationFlag{Name: "since", Value: 365 * 24 * time.Hour} toFlag := cli.DurationFlag{Name: "to", Value: 0} localhost := cli.BoolFlag{Name: "localhost"} @@ -135,7 +136,7 @@ func queryApiSummarizeCmd( ll := getLogger(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) _, err := ensureLoggedIn(ctx, cctx, state, tokenSource, apiURL, httpClient) if err != nil { return err @@ -145,13 +146,13 @@ func queryApiSummarizeCmd( ) queryClient = queryv1connect.NewQueryServiceClient(httpClient, apiURL, clOpts) } else { - httpClient := getHTTPClient(cctx) cfg := getCfg(cctx) if cfg.ExperimentalFeatures == nil || cfg.ExperimentalFeatures.ServeLocalhostOnPort == nil { return fmt.Errorf("localhost feature is not enabled or not configured, can't dial localhost") } - addr := fmt.Sprintf("http://localhost:%d", *cfg.ExperimentalFeatures.ServeLocalhostOnPort) - queryClient = queryv1connect.NewQueryServiceClient(httpClient, addr) + apiURL := fmt.Sprintf("http://localhost:%d", *cfg.ExperimentalFeatures.ServeLocalhostOnPort) + httpClient := getHTTPClient(cctx, apiURL) + queryClient = queryv1connect.NewQueryServiceClient(httpClient, apiURL) } termWidth, termHeight, err := term.GetSize(os.Stdout.Fd()) @@ -159,14 +160,22 @@ func queryApiSummarizeCmd( return fmt.Errorf("getting term size: %v", err) } now := time.Now() - from := now.Add(-cctx.Duration(fromFlag.Name)) - to := now.Add(-cctx.Duration(toFlag.Name)) + var ( + from *timestamppb.Timestamp + to *timestamppb.Timestamp + ) + if cctx.Duration(fromFlag.Name) != 0 { + from = timestamppb.New(now.Add(-cctx.Duration(fromFlag.Name))) + } + if cctx.Duration(toFlag.Name) != 0 { + to = timestamppb.New(now.Add(-cctx.Duration(toFlag.Name))) + } res, err := queryClient.SummarizeEvents(ctx, connect.NewRequest(&queryv1.SummarizeEventsRequest{ AccountId: *state.CurrentAccountID, - BucketCount: 20, - From: timestamppb.New(from), - To: timestamppb.New(to), + BucketCount: uint32(cctx.Int(bucket.Name)), + From: from, + To: to, })) if err != nil { return fmt.Errorf("querying summary data: %v", err) @@ -186,7 +195,7 @@ func queryApiSummarizeCmd( firstTimeformat = "'06 01/02" } lastTimeFormat := "'06 01/02 15:04:05" - window := to.Sub(from) + window := to.AsTime().Sub(from.AsTime()) if window < time.Microsecond { lastTimeFormat = ".000000000" } else if window < time.Millisecond { @@ -220,7 +229,7 @@ func queryApiSummarizeCmd( } tslc := timeserieslinechart.New(termWidth, termHeight-3, - timeserieslinechart.WithTimeRange(from, to), + timeserieslinechart.WithTimeRange(from.AsTime(), to.AsTime()), ) tslc.XLabelFormatter = linechart.LabelFormatter(func(i int, f float64) string { t := time.Unix(int64(f), 0).UTC() @@ -259,7 +268,7 @@ func queryApiWatchCmd( getState func(cctx *cli.Context) *state.State, getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource, getAPIUrl func(cctx *cli.Context) string, - getHTTPClient func(*cli.Context) *http.Client, + getHTTPClient func(cctx *cli.Context, apiURL string) *http.Client, ) cli.Command { fromFlag := cli.DurationFlag{Name: "since", Value: 365 * 24 * time.Hour} toFlag := cli.DurationFlag{Name: "to", Value: 0} @@ -276,7 +285,7 @@ func queryApiWatchCmd( ll := getLogger(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) _, err := ensureLoggedIn(ctx, cctx, state, tokenSource, apiURL, httpClient) if err != nil { return err @@ -286,17 +295,26 @@ func queryApiWatchCmd( ) queryClient = queryv1connect.NewQueryServiceClient(httpClient, apiURL, clOpts) } else { - httpClient := getHTTPClient(cctx) - + cfg := getCfg(cctx) if cfg.ExperimentalFeatures == nil || cfg.ExperimentalFeatures.ServeLocalhostOnPort == nil { return fmt.Errorf("localhost feature is not enabled or not configured, can't dial localhost") } - addr := fmt.Sprintf("http://localhost:%d", *cfg.ExperimentalFeatures.ServeLocalhostOnPort) - queryClient = queryv1connect.NewQueryServiceClient(httpClient, addr) + apiURL := fmt.Sprintf("http://localhost:%d", *cfg.ExperimentalFeatures.ServeLocalhostOnPort) + httpClient := getHTTPClient(cctx, apiURL) + queryClient = queryv1connect.NewQueryServiceClient(httpClient, apiURL) } now := time.Now() - from := now.Add(-cctx.Duration(fromFlag.Name)) - to := now.Add(-cctx.Duration(toFlag.Name)) + var ( + from *timestamppb.Timestamp + to *timestamppb.Timestamp + query = strings.Join(cctx.Args(), " ") + ) + if cctx.Duration(fromFlag.Name) != 0 { + from = timestamppb.New(now.Add(-cctx.Duration(fromFlag.Name))) + } + if cctx.Duration(toFlag.Name) != 0 { + to = timestamppb.New(now.Add(-cctx.Duration(toFlag.Name))) + } sinkOpts, errs := stdiosink.StdioOptsFrom(*cfg) if len(errs) > 0 { for _, err := range errs { @@ -306,13 +324,14 @@ func queryApiWatchCmd( loginfo("from=%s", from) loginfo("to=%s", to) - loginfo("query=%s", strings.Join(cctx.Args(), " ")) + loginfo("query=%s", query) req := &queryv1.WatchQueryRequest{ AccountId: *state.CurrentAccountID, Query: &typesv1.LogQuery{ - From: timestamppb.New(from), - To: timestamppb.New(to), + From: from, + To: to, + Query: query, }, } res, err := queryClient.WatchQuery(ctx, connect.NewRequest(req)) diff --git a/cmd/humanlog/versions.go b/cmd/humanlog/versions.go index 0359cfda..ce3c18a6 100644 --- a/cmd/humanlog/versions.go +++ b/cmd/humanlog/versions.go @@ -101,7 +101,7 @@ func versionCmd( getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource, getAPIUrl func(cctx *cli.Context) string, getBaseSiteURL func(cctx *cli.Context) string, - getHTTPClient func(*cli.Context) *http.Client, + getHTTPClient func(*cli.Context, string) *http.Client, ) cli.Command { return cli.Command{ Name: versionCmdName, @@ -117,7 +117,7 @@ func versionCmd( state := getState(cctx) tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) var channelName *string if cfg.ExperimentalFeatures != nil && cfg.ExperimentalFeatures.ReleaseChannel != nil { channelName = cfg.ExperimentalFeatures.ReleaseChannel @@ -152,7 +152,7 @@ func versionCmd( tokenSource := getTokenSource(cctx) apiURL := getAPIUrl(cctx) baseSiteURL := getBaseSiteURL(cctx) - httpClient := getHTTPClient(cctx) + httpClient := getHTTPClient(cctx, apiURL) var channelName *string if cfg.ExperimentalFeatures != nil && cfg.ExperimentalFeatures.ReleaseChannel != nil { channelName = cfg.ExperimentalFeatures.ReleaseChannel diff --git a/go.mod b/go.mod index 30a70d49..d7bef47f 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c github.com/rs/cors v1.11.0 github.com/stretchr/testify v1.9.0 + github.com/teivah/broadcast v0.1.0 github.com/urfave/cli v1.22.14 golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa golang.org/x/net v0.23.0 @@ -74,3 +75,5 @@ require ( golang.org/x/text v0.18.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/humanlogio/api/go => /Users/antoine/code/src/github.com/humanlogio/api/go diff --git a/go.sum b/go.sum index 9f503394..40484271 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= -github.com/humanlogio/api/go v0.0.0-20241011070935-7bb04da206c8 h1:fp03gUzn8/rQOgYDD6XUi9X8tl8UD8SP/NlHs8sFHTg= -github.com/humanlogio/api/go v0.0.0-20241011070935-7bb04da206c8/go.mod h1:+hU/MU1g6QvtbeknKOlUI1yEStVqkPJ8jmYIj63OV5I= github.com/kr/logfmt v0.0.0-20210122060352-19f9bcb100e6 h1:ZK1mH67KVyVW/zOLu0xLva+f6xJ8vt+LGrkQq5FJYLY= github.com/kr/logfmt v0.0.0-20210122060352-19f9bcb100e6/go.mod h1:JIiJcj9TX57tEvCXjm6eaHd2ce4pZZf9wzYuThq45u8= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -133,6 +131,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tadvi/systray v0.0.0-20190226123456-11a2b8fa57af h1:6yITBqGTE2lEeTPG04SN9W+iWHCRyHqlVYILiSXziwk= github.com/tadvi/systray v0.0.0-20190226123456-11a2b8fa57af/go.mod h1:4F09kP5F+am0jAwlQLddpoMDM+iewkxxt6nxUQ5nq5o= +github.com/teivah/broadcast v0.1.0 h1:UMs1tn8w20Xlnod+VbLbwH3dzEH2zfJy4lxdzZjQLL0= +github.com/teivah/broadcast v0.1.0/go.mod h1:mXEgvXdYz2xUkQFARxI+jyX1MfCBwMDiGjIKSAsEq1g= github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk= github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA= golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= diff --git a/internal/localstorage/memory.go b/internal/localstorage/memory.go index 53bcd687..d389ba7d 100644 --- a/internal/localstorage/memory.go +++ b/internal/localstorage/memory.go @@ -3,12 +3,14 @@ package localstorage import ( "context" "fmt" + "log/slog" "slices" "sync" "time" typesv1 "github.com/humanlogio/api/go/types/v1" "github.com/humanlogio/humanlog/pkg/sink" + "github.com/teivah/broadcast" "google.golang.org/protobuf/proto" ) @@ -20,13 +22,20 @@ var ( ) type MemStorage struct { + ll *slog.Logger heartbeat time.Duration sinksMu sync.Mutex sinks []*MemStorageSink + + newSinkRelay *broadcast.Relay[*MemStorageSink] } -func NewMemStorage() *MemStorage { - return &MemStorage{heartbeat: time.Hour} +func NewMemStorage(ll *slog.Logger) *MemStorage { + return &MemStorage{ + ll: ll, + heartbeat: time.Hour, + newSinkRelay: broadcast.NewRelay[*MemStorageSink](), + } } type SummarizedEvents struct { @@ -37,22 +46,53 @@ type SummarizedEvents struct { } } -func (str *MemStorage) Query(ctx context.Context, q *typesv1.LogQuery) ([]Cursor, error) { +func (str *MemStorage) Query(ctx context.Context, q *typesv1.LogQuery) (<-chan Cursor, error) { if q.To != nil && q.From.AsTime().After(q.To.AsTime()) { return nil, fmt.Errorf("invalid query, `to` is before `from`") } str.sinksMu.Lock() defer str.sinksMu.Unlock() + var cursors []Cursor for _, snk := range str.sinks { if idx, ok, err := snk.firstMatch(ctx, q); err != nil { return nil, err - } else if ok { - cursors = append(cursors, &MemSinkCursor{q: q, cur: idx, next: idx, more: true, sink: snk}) + } else if ok || q.To == nil { + ll := snk.queryLogger(q) + ll.DebugContext(ctx, "sink is relevant for query") + cursors = append(cursors, newMemSinkCursor(ll, q, idx, idx, true, snk)) } } - return cursors, nil + newCursors := make(chan Cursor, len(cursors)) + for _, cursor := range cursors { + newCursors <- cursor + } + if q.To == nil { + l := str.newSinkRelay.Listener(1) + + go func() { + defer l.Close() + for { + select { + case <-ctx.Done(): + return + case newSink := <-l.Ch(): + ll := newSink.queryLogger(q) + ll.DebugContext(ctx, "a new sink appeared") + newCursor := newMemSinkCursor(ll, q, 0, 0, true, newSink) + select { + case <-ctx.Done(): + return + case newCursors <- newCursor: + } + } + } + }() + } else { + close(newCursors) + } + return newCursors, nil } func (str *MemStorage) Heartbeat(ctx context.Context, machineID, sessionID int64) (time.Duration, error) { @@ -62,27 +102,51 @@ func (str *MemStorage) Heartbeat(ctx context.Context, machineID, sessionID int64 func (str *MemStorage) SinkFor(machineID, sessionID int64) (sink.Sink, time.Duration, error) { str.sinksMu.Lock() defer str.sinksMu.Unlock() + id := SinkID{machineID: machineID, sessionID: sessionID} loc, ok := slices.BinarySearchFunc(str.sinks, id, func(mss *MemStorageSink, si SinkID) int { return mss.id.cmp(si) }) if ok { - return str.sinks[loc], time.Hour, nil + return str.sinks[loc], str.heartbeat, nil } - newsink := &MemStorageSink{id: id} + ll := str.ll.With( + slog.Int64("machine.id", machineID), + slog.Int64("session.id", sessionID), + ) + newsink := newMemStorageSink(ll, id) str.sinks = slices.Insert(str.sinks, loc, newsink) + str.newSinkRelay.Broadcast(newsink) + return newsink, str.heartbeat, nil } type MemSinkCursor struct { - q *typesv1.LogQuery + ll *slog.Logger + q *typesv1.LogQuery cur int next int more bool - sink *MemStorageSink - err error + sink *MemStorageSink + listener *broadcast.Listener[struct{}] + err error +} + +func newMemSinkCursor( + ll *slog.Logger, + q *typesv1.LogQuery, + cur int, + next int, + more bool, + sink *MemStorageSink, +) *MemSinkCursor { + var listener *broadcast.Listener[struct{}] + if !sink.closed && q.To == nil { + listener = sink.relay.Listener(1) + } + return &MemSinkCursor{ll: ll, q: q, cur: cur, next: next, more: more, sink: sink, listener: listener} } func (crs *MemSinkCursor) IDs() (machineID, sessionID int64) { @@ -96,7 +160,7 @@ func (crs *MemSinkCursor) Next(ctx context.Context) bool { } crs.cur = crs.next crs.next = crs.next + 1 - crs.next, crs.more, crs.err = crs.sink.nextMatch(ctx, crs.q, crs.next) + crs.next, crs.more, crs.err = crs.sink.nextMatch(ctx, crs.ll, crs.q, crs.next, crs.listener) return hasCurrent && crs.err == nil } @@ -108,6 +172,13 @@ func (crs *MemSinkCursor) Err() error { return crs.err } +func (crs *MemSinkCursor) Close() error { + if crs.listener != nil { + crs.listener.Close() + } + return nil +} + type SinkID struct { machineID int64 sessionID int64 @@ -122,62 +193,141 @@ func (sid SinkID) cmp(other SinkID) int { } type MemStorageSink struct { + ll *slog.Logger mu sync.RWMutex id SinkID evs []*typesv1.LogEvent + + closed bool + relay *broadcast.Relay[struct{}] +} + +func newMemStorageSink(ll *slog.Logger, id SinkID) *MemStorageSink { + return &MemStorageSink{ll: ll, id: id, relay: broadcast.NewRelay[struct{}]()} } -func (snk *MemStorageSink) firstMatch(_ context.Context, q *typesv1.LogQuery) (index int, ok bool, err error) { +func (snk *MemStorageSink) queryLogger(q *typesv1.LogQuery) *slog.Logger { + ll := snk.ll.With( + slog.Bool("sink.closed", snk.closed), + slog.String("query", q.Query), + ) + if q.From != nil { + ll = ll.With(slog.Time("from", q.From.AsTime())) + } + if q.To != nil { + ll = ll.With(slog.Time("to", q.To.AsTime())) + } + return ll +} + +func (snk *MemStorageSink) firstMatch(ctx context.Context, q *typesv1.LogQuery) (index int, ok bool, err error) { snk.mu.RLock() defer snk.mu.RUnlock() for i, ev := range snk.evs { if eventMatches(q, ev) { + snk.ll.DebugContext(ctx, "first match found at index", slog.Int("i", i)) return i, true, nil } } - return -1, false, nil + return len(snk.evs), false, nil } -func (snk *MemStorageSink) nextMatch(_ context.Context, q *typesv1.LogQuery, fromIndex int) (index int, ok bool, err error) { +func (snk *MemStorageSink) nextMatch( + ctx context.Context, + ll *slog.Logger, + q *typesv1.LogQuery, + fromIndex int, + listener *broadcast.Listener[struct{}], +) (index int, ok bool, err error) { +restartMatch: snk.mu.RLock() - defer snk.mu.RUnlock() + unlocked := false + defer func() { + if !unlocked { + snk.mu.RUnlock() + } + }() + shouldWaitForMore := !snk.closed && listener != nil && q.To == nil if len(snk.evs) < fromIndex { + ll.DebugContext(ctx, "reached end of buffer") + if shouldWaitForMore { + // sink is still receiving data and query is unbound + // unlock the sink so more logs can be received, + // then restart the next match process + snk.mu.RUnlock() + unlocked = true + ll.DebugContext(ctx, "waiting for more data") + select { + case <-listener.Ch(): + ll.DebugContext(ctx, "more data received, rematching") + goto restartMatch + case <-ctx.Done(): + return -1, false, ctx.Err() + } + } return -1, false, nil } + ll.DebugContext(ctx, "searching buffer for next match") for i, ev := range snk.evs[fromIndex:] { if eventMatches(q, ev) { return fromIndex + i, true, nil } } - return -1, false, nil + if !shouldWaitForMore { + return -1, false, nil + } + // sink is still receiving data and query is unbound + // unlock the sink so more logs can be received, + // then restart the next match process + snk.mu.RUnlock() + unlocked = true + ll.DebugContext(ctx, "waiting for more data") + select { + case <-listener.Ch(): + ll.DebugContext(ctx, "more data received, rematching") + goto restartMatch + case <-ctx.Done(): + return -1, false, ctx.Err() + } } func eventMatches(q *typesv1.LogQuery, ev *typesv1.LogEvent) bool { ts := ev.ParsedAt.AsTime() from := q.From.AsTime() - to := q.To.AsTime() atOrAfter := ts.Equal(from) || ts.After(from) - before := ts.Before(to) + + to := q.To + before := to == nil || ts.Before(to.AsTime()) // TODO(antoine): match more stuff on the query return atOrAfter && before } +var q = struct{}{} + func (snk *MemStorageSink) Receive(ctx context.Context, ev *typesv1.LogEvent) error { snk.mu.Lock() defer snk.mu.Unlock() + if snk.closed { + return fmt.Errorf("sink is closed") + } snk.receive(ev) + snk.relay.Broadcast(q) return nil } func (snk *MemStorageSink) ReceiveBatch(ctx context.Context, evs []*typesv1.LogEvent) error { snk.mu.Lock() defer snk.mu.Unlock() + if snk.closed { + return fmt.Errorf("sink is closed") + } for _, ev := range evs { snk.receive(ev) } + snk.relay.Broadcast(q) return nil } @@ -199,4 +349,9 @@ func (snk *MemStorageSink) receive(ev *typesv1.LogEvent) { } -func (snk *MemStorageSink) Flush(ctx context.Context) error { return nil } +func (snk *MemStorageSink) Close(ctx context.Context) error { + snk.mu.Lock() + defer snk.mu.Unlock() + snk.closed = true + return nil +} diff --git a/internal/localstorage/memory_test.go b/internal/localstorage/memory_test.go index 5f37592d..ab9bfeef 100644 --- a/internal/localstorage/memory_test.go +++ b/internal/localstorage/memory_test.go @@ -2,6 +2,8 @@ package localstorage import ( "context" + "log/slog" + "os" "testing" "time" @@ -119,6 +121,33 @@ func TestMemoryStorage(t *testing.T) { }, }, }, + { + name: "from only", + q: &typesv1.LogQuery{ + From: timestamppb.New(musttime("2006-01-02T15:04:06.002")), + }, + input: []*typesv1.LogEventGroup{ + { + MachineId: 1, SessionId: 2, + Logs: []*typesv1.LogEvent{ + {ParsedAt: timestamppb.New(musttime("2006-01-02T15:04:06.001")), Raw: []byte("hello world 1")}, + {ParsedAt: timestamppb.New(musttime("2006-01-02T15:04:06.002")), Raw: []byte("hello world 2")}, + {ParsedAt: timestamppb.New(musttime("2006-01-02T15:04:06.003")), Raw: []byte("hello world 3")}, + {ParsedAt: timestamppb.New(musttime("2006-01-02T15:04:06.004")), Raw: []byte("hello world 4")}, + }, + }, + }, + want: []*typesv1.LogEventGroup{ + { + MachineId: 1, SessionId: 2, + Logs: []*typesv1.LogEvent{ + {ParsedAt: timestamppb.New(musttime("2006-01-02T15:04:06.002")), Raw: []byte("hello world 2")}, + {ParsedAt: timestamppb.New(musttime("2006-01-02T15:04:06.003")), Raw: []byte("hello world 3")}, + {ParsedAt: timestamppb.New(musttime("2006-01-02T15:04:06.004")), Raw: []byte("hello world 4")}, + }, + }, + }, + }, { name: "slice", q: &typesv1.LogQuery{ @@ -152,7 +181,7 @@ func TestMemoryStorage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - mem := NewMemStorage() + mem := NewMemStorage(slog.New(slog.NewTextHandler(os.Stderr, nil))) for _, leg := range tt.input { snk, _, err := mem.SinkFor(leg.MachineId, leg.SessionId) @@ -161,7 +190,7 @@ func TestMemoryStorage(t *testing.T) { err = snk.Receive(ctx, ev) require.NoError(t, err) } - err = snk.Flush(ctx) + err = snk.Close(ctx) require.NoError(t, err) } @@ -177,9 +206,9 @@ func TestMemoryStorage(t *testing.T) { } } -func drainCursors(t *testing.T, ctx context.Context, cursors []Cursor) []*typesv1.LogEventGroup { +func drainCursors(t *testing.T, ctx context.Context, cursors <-chan Cursor) []*typesv1.LogEventGroup { out := make([]*typesv1.LogEventGroup, 0, len(cursors)) - for _, cursor := range cursors { + for cursor := range cursors { mid, sid := cursor.IDs() leg := &typesv1.LogEventGroup{ MachineId: mid, SessionId: sid, diff --git a/internal/localstorage/queryable.go b/internal/localstorage/queryable.go index fc99c6f7..b1aef8d1 100644 --- a/internal/localstorage/queryable.go +++ b/internal/localstorage/queryable.go @@ -15,7 +15,7 @@ type Storage interface { } type Queryable interface { - Query(context.Context, *typesv1.LogQuery) ([]Cursor, error) + Query(context.Context, *typesv1.LogQuery) (<-chan Cursor, error) } type Cursor interface { @@ -23,4 +23,5 @@ type Cursor interface { Next(context.Context) bool Event() *typesv1.LogEvent Err() error + Close() error } diff --git a/internal/localsvc/svc.go b/internal/localsvc/svc.go index e1f952b7..c6523cfa 100644 --- a/internal/localsvc/svc.go +++ b/internal/localsvc/svc.go @@ -86,27 +86,29 @@ func (svc *Service) IngestStream(ctx context.Context, req *connect.ClientStream[ // get the first message which has the metadata to start ingesting if !req.Receive() { - msg := req.Msg() - machineID = int64(msg.MachineId) - sessionID = int64(msg.SessionId) - if sessionID == 0 { - sessionID = time.Now().UnixNano() - } - if machineID == 0 && svc.state.MachineID != nil { - machineID = int64(*svc.state.MachineID) - } + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("must contain at least a first request")) + } + msg := req.Msg() + machineID = int64(msg.MachineId) + sessionID = int64(msg.SessionId) + if sessionID == 0 { + sessionID = time.Now().UnixNano() + } + if machineID == 0 && svc.state.MachineID != nil { + machineID = int64(*svc.state.MachineID) } ll = ll.With( slog.Int64("machine_id", machineID), slog.Int64("session_id", sessionID), ) + ll.DebugContext(ctx, "receiving data from stream") snk, heartbeatIn, err := svc.storage.SinkFor(machineID, sessionID) if err != nil { ll.ErrorContext(ctx, "obtaining sink for stream", slog.Any("err", err)) return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("obtaining sink for stream: %v", err)) } defer func() { - if ferr := snk.Flush(ctx); ferr != nil { + if ferr := snk.Close(ctx); ferr != nil { if err == nil { err = ferr } else { @@ -114,7 +116,14 @@ func (svc *Service) IngestStream(ctx context.Context, req *connect.ClientStream[ } } }() + if bsnk, ok := snk.(sink.BatchSink); ok { + // ingest the first message + if err := bsnk.ReceiveBatch(ctx, msg.Events); err != nil { + ll.ErrorContext(ctx, "ingesting event batch", slog.Any("err", err)) + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("ingesting event batch: %v", err)) + } + // then wait for more for req.Receive() { msg := req.Msg() if err := bsnk.ReceiveBatch(ctx, msg.Events); err != nil { @@ -123,6 +132,14 @@ func (svc *Service) IngestStream(ctx context.Context, req *connect.ClientStream[ } } } else { + // ingest the first message + for _, ev := range msg.Events { + if err := snk.Receive(ctx, ev); err != nil { + ll.ErrorContext(ctx, "ingesting event", slog.Any("err", err)) + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("ingesting event: %v", err)) + } + } + // then wait for more for req.Receive() { msg := req.Msg() for _, ev := range msg.Events { @@ -189,7 +206,7 @@ func (svc *Service) SummarizeEvents(ctx context.Context, req *connect.Request[qr } ll = ll.With(slog.Duration("width", width)) - for _, cursor := range cursors { + for cursor := range cursors { for cursor.Next(ctx) { ts := cursor.Event().ParsedAt.AsTime().Truncate(width) loc, _ := slices.BinarySearchFunc(buckets, ts, func(a bucket, t time.Time) int { @@ -200,6 +217,9 @@ func (svc *Service) SummarizeEvents(ctx context.Context, req *connect.Request[qr if err := cursor.Err(); err != nil { return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("counting summary: %v", err)) } + if err := cursor.Close(); err != nil { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("closing cursor: %v", err)) + } } ll.DebugContext(ctx, "iterated all cursors") out := &qrv1.SummarizeEventsResponse{ @@ -218,9 +238,41 @@ func (svc *Service) SummarizeEvents(ctx context.Context, req *connect.Request[qr return connect.NewResponse(out), nil } func (svc *Service) WatchQuery(ctx context.Context, req *connect.Request[qrv1.WatchQueryRequest], stream *connect.ServerStream[qrv1.WatchQueryResponse]) error { + query := req.Msg.GetQuery() + ll := svc.ll.With( - slog.Any("query", req.Msg.GetQuery().String()), + slog.String("query.query", query.Query), ) + if query.From != nil { + ll = ll.With(slog.String("query.from", query.From.AsTime().Format(time.RFC3339Nano))) + } + if query.To != nil { + ll = ll.With(slog.String("query.to", query.To.AsTime().Format(time.RFC3339Nano))) + } + + // ticker := time.NewTicker(time.Second) + // defer ticker.Stop() + // for i := 0; ; i++ { + // select { + // case now := <-ticker.C: + + // err := stream.Send(&qrv1.WatchQueryResponse{ + // Events: []*typesv1.LogEventGroup{ + // { + // MachineId: 1, + // SessionId: 1, + // Logs: []*typesv1.LogEvent{ + // {ParsedAt: timestamppb.New(now), Raw: []byte(fmt.Sprintf("it's now %d o-clock", now.Hour()))}, + // }, + // }, + // }, + // }) + // ll.DebugContext(ctx, "send a message group", slog.Any("err", err)) + + // case <-ctx.Done(): + // return nil + // } + // } ll.DebugContext(ctx, "running query through storage") cursors, err := svc.storage.Query(ctx, req.Msg.Query) @@ -232,6 +284,9 @@ func (svc *Service) WatchQuery(ctx context.Context, req *connect.Request[qrv1.Wa legc := make(chan *typesv1.LogEventGroup) iterateCursor := func(ctx context.Context, cursor localstorage.Cursor) error { + defer func() { + _ = cursor.Close() + }() var ( lastSend = time.Now() machineID, sessionID = cursor.IDs() @@ -259,8 +314,17 @@ func (svc *Service) WatchQuery(ctx context.Context, req *connect.Request[qrv1.Wa evs = evs[:0] } } + if err := cursor.Err(); err != nil { + ll.ErrorContext(ctx, "failed to advance query cursor", slog.Any("err", err)) + } ll.DebugContext(ctx, "cursor done, sending last batch", slog.Int("batch_len", len(evs))) select { + case <-ctx.Done(): + return nil + default: + + } + select { case legc <- &typesv1.LogEventGroup{ MachineId: machineID, SessionId: sessionID, @@ -271,13 +335,25 @@ func (svc *Service) WatchQuery(ctx context.Context, req *connect.Request[qrv1.Wa return nil } + allCursorsStarted := make(chan struct{}) + cursorCtx, cancelCursors := context.WithCancel(ctx) defer cancelCursors() eg, cursorCtx := errgroup.WithContext(cursorCtx) - for _, cursor := range cursors { - cursor := cursor - eg.Go(func() error { return iterateCursor(cursorCtx, cursor) }) - } + go func() { + defer close(allCursorsStarted) + for { + select { + case <-ctx.Done(): + return + case cursor, more := <-cursors: + if !more { + return + } + eg.Go(func() error { return iterateCursor(cursorCtx, cursor) }) + } + } + }() doneSending := make(chan struct{}) go func() { @@ -292,14 +368,30 @@ func (svc *Service) WatchQuery(ctx context.Context, req *connect.Request[qrv1.Wa defer sender.Stop() ll.DebugContext(ctx, "accumulator: starting accumulation loop") + defer func() { + ll.DebugContext(ctx, "accumulator: done accumulating") + if len(legs) > 0 { + ll.DebugContext(ctx, "accumulator: trying to send final watch query response, may not work") + err = stream.Send(&qrv1.WatchQueryResponse{ + Events: legs, + }) + if err != nil { + ll.ErrorContext(ctx, "accumulator: failed to send response", slog.Any("err", err)) + return + } + } + }() wait_for_more_leg: for { select { case <-ctx.Done(): return - case leg := <-legc: + case leg, more := <-legc: + if !more { + break wait_for_more_leg + } // try to append to an existing LEG first - ll.DebugContext(ctx, "accumulator: received cursor batch") + ll.DebugContext(ctx, "accumulator: received cursor batch", slog.Int("leg_count", len(legs))) for _, eleg := range legs { if eleg != nil && leg != nil && eleg.MachineId == leg.MachineId && eleg.SessionId == leg.SessionId { @@ -331,7 +423,11 @@ func (svc *Service) WatchQuery(ctx context.Context, req *connect.Request[qrv1.Wa }() ll.DebugContext(ctx, "accumulator: streaming started") - err = eg.Wait() + select { + case <-allCursorsStarted: + err = eg.Wait() + case <-ctx.Done(): + } ll.DebugContext(ctx, "accumulator: all data consumed, finishing") close(legc) if err != nil { diff --git a/internal/localsvc/svc_test.go b/internal/localsvc/svc_test.go index 48c259d9..5e9968db 100644 --- a/internal/localsvc/svc_test.go +++ b/internal/localsvc/svc_test.go @@ -120,7 +120,7 @@ func TestSummarize(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ll := slog.New(slog.NewTextHandler(os.Stderr, nil)) - mem := localstorage.NewMemStorage() + mem := localstorage.NewMemStorage(ll) for _, leg := range tt.input { snk, _, err := mem.SinkFor(leg.MachineId, leg.SessionId) @@ -129,7 +129,7 @@ func TestSummarize(t *testing.T) { err = snk.Receive(ctx, ev) require.NoError(t, err) } - err = snk.Flush(ctx) + err = snk.Close(ctx) require.NoError(t, err) } diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go index a129c23a..d992004d 100644 --- a/pkg/retry/retry_test.go +++ b/pkg/retry/retry_test.go @@ -42,37 +42,3 @@ func TestRetrySuccess(t *testing.T) { require.Equal(t, 1, called) require.Equal(t, start, now) } - -func TestRetryFailOnce(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - now := time.Date(2024, time.October, 25, 13, 40, 37, 0, time.UTC) - start := now - r := rand.New(rand.NewSource(now.UnixNano())) - tch := make(chan time.Time, 1) - tch <- now - - called := 0 - err := Do(ctx, - func(ctx context.Context) (bool, error) { - called++ - return called != 2, nil - }, - UseRand(r), - useSleepFn(func(d time.Duration) <-chan time.Time { - t.Logf("sleeping for %v", d) - now = now.Add(d) - go func() { - select { - case tch <- now: - case <-ctx.Done(): - panic("so sloooow") - } - }() - return tch - }), - ) - require.NoError(t, err) - require.Equal(t, 2, called) - require.Equal(t, start.Add(61015267), now) -} diff --git a/pkg/sink/bufsink/sized.go b/pkg/sink/bufsink/sized.go index e035e142..e857de9e 100644 --- a/pkg/sink/bufsink/sized.go +++ b/pkg/sink/bufsink/sized.go @@ -24,7 +24,7 @@ func NewSizedBufferedSink(size int, flush sink.BatchSink) *SizedBuffer { } } -func (sn *SizedBuffer) Flush(ctx context.Context) error { +func (sn *SizedBuffer) Close(ctx context.Context) error { return nil } diff --git a/pkg/sink/logsvcsink/bidistream_sink.go b/pkg/sink/logsvcsink/bidistream_sink.go index cfb04943..5e5c6696 100644 --- a/pkg/sink/logsvcsink/bidistream_sink.go +++ b/pkg/sink/logsvcsink/bidistream_sink.go @@ -219,8 +219,8 @@ func (snk *ConnectBidiStreamSink) Receive(ctx context.Context, ev *typesv1.LogEv return nil } -// Flush can only be called once, calling it twice will panic. -func (snk *ConnectBidiStreamSink) Flush(ctx context.Context) error { +// Close can only be called once, calling it twice will panic. +func (snk *ConnectBidiStreamSink) Close(ctx context.Context) error { close(snk.eventsc) snk.ll.DebugContext(ctx, "starting to flush") select { diff --git a/pkg/sink/logsvcsink/stream_sink.go b/pkg/sink/logsvcsink/stream_sink.go index cc5318e8..1e85ee57 100644 --- a/pkg/sink/logsvcsink/stream_sink.go +++ b/pkg/sink/logsvcsink/stream_sink.go @@ -251,8 +251,8 @@ func (snk *ConnectStreamSink) Receive(ctx context.Context, ev *typesv1.LogEvent) return nil } -// Flush can only be called once, calling it twice will panic. -func (snk *ConnectStreamSink) Flush(ctx context.Context) error { +// Close can only be called once, calling it twice will panic. +func (snk *ConnectStreamSink) Close(ctx context.Context) error { close(snk.eventsc) snk.ll.DebugContext(ctx, "starting to flush") select { diff --git a/pkg/sink/logsvcsink/unary_sink.go b/pkg/sink/logsvcsink/unary_sink.go index eb03708d..889696ef 100644 --- a/pkg/sink/logsvcsink/unary_sink.go +++ b/pkg/sink/logsvcsink/unary_sink.go @@ -228,8 +228,8 @@ func (snk *ConnectUnarySink) Receive(ctx context.Context, ev *typesv1.LogEvent) return nil } -// Flush can only be called once, calling it twice will panic. -func (snk *ConnectUnarySink) Flush(ctx context.Context) error { +// Close can only be called once, calling it twice will panic. +func (snk *ConnectUnarySink) Close(ctx context.Context) error { close(snk.eventsc) snk.ll.DebugContext(ctx, "starting to flush") select { diff --git a/pkg/sink/sink.go b/pkg/sink/sink.go index 3d6f95e7..d1e340d5 100644 --- a/pkg/sink/sink.go +++ b/pkg/sink/sink.go @@ -8,10 +8,10 @@ import ( type Sink interface { Receive(ctx context.Context, ev *typesv1.LogEvent) error - Flush(ctx context.Context) error + Close(ctx context.Context) error } type BatchSink interface { ReceiveBatch(ctx context.Context, evs []*typesv1.LogEvent) error - Flush(ctx context.Context) error + Close(ctx context.Context) error } diff --git a/pkg/sink/stdiosink/stdio.go b/pkg/sink/stdiosink/stdio.go index ed7b9171..f5d6e766 100644 --- a/pkg/sink/stdiosink/stdio.go +++ b/pkg/sink/stdiosink/stdio.go @@ -127,7 +127,7 @@ func NewStdio(w io.Writer, opts StdioOpts) *Stdio { } } -func (std *Stdio) Flush(ctx context.Context) error { +func (std *Stdio) Close(ctx context.Context) error { return nil } diff --git a/pkg/sink/teesink/tee.go b/pkg/sink/teesink/tee.go index 2f55fbc1..9500a666 100644 --- a/pkg/sink/teesink/tee.go +++ b/pkg/sink/teesink/tee.go @@ -44,9 +44,9 @@ func (sn *Tee) Receive(ctx context.Context, ev *typesv1.LogEvent) error { return nil } -func (sn *Tee) Flush(ctx context.Context) error { +func (sn *Tee) Close(ctx context.Context) error { for i, sinks := range sn.sinks { - if err := sinks.Flush(ctx); err != nil { + if err := sinks.Close(ctx); err != nil { return fmt.Errorf("tee sink %d: %w", i, err) } } @@ -88,14 +88,14 @@ func (sn *MixedBatchingTee) ReceiveBatch(ctx context.Context, evs []*typesv1.Log return nil } -func (sn *MixedBatchingTee) Flush(ctx context.Context) error { +func (sn *MixedBatchingTee) Close(ctx context.Context) error { for i, sinks := range sn.nonbatchers { - if err := sinks.Flush(ctx); err != nil { + if err := sinks.Close(ctx); err != nil { return fmt.Errorf("tee sink %d: %w", i, err) } } for i, sinks := range sn.batchers { - if err := sinks.Flush(ctx); err != nil { + if err := sinks.Close(ctx); err != nil { return fmt.Errorf("tee sink %d: %w", i, err) } } @@ -124,9 +124,9 @@ func (sn *BatchingTee) ReceiveBatch(ctx context.Context, evs []*typesv1.LogEvent return nil } -func (sn *BatchingTee) Flush(ctx context.Context) error { +func (sn *BatchingTee) Close(ctx context.Context) error { for i, sinks := range sn.batchers { - if err := sinks.Flush(ctx); err != nil { + if err := sinks.Close(ctx); err != nil { return fmt.Errorf("tee sink %d: %w", i, err) } } diff --git a/vendor/github.com/humanlogio/api/go/types/v1/logquery.pb.go b/vendor/github.com/humanlogio/api/go/types/v1/logquery.pb.go index a07ddf8e..ad00fd0f 100644 --- a/vendor/github.com/humanlogio/api/go/types/v1/logquery.pb.go +++ b/vendor/github.com/humanlogio/api/go/types/v1/logquery.pb.go @@ -26,8 +26,9 @@ type LogQuery struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - From *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=from,proto3" json:"from,omitempty"` - To *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=to,proto3" json:"to,omitempty"` + From *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=from,proto3" json:"from,omitempty"` + To *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=to,proto3" json:"to,omitempty"` + Query string `protobuf:"bytes,100,opt,name=query,proto3" json:"query,omitempty"` } func (x *LogQuery) Reset() { @@ -74,6 +75,13 @@ func (x *LogQuery) GetTo() *timestamppb.Timestamp { return nil } +func (x *LogQuery) GetQuery() string { + if x != nil { + return x.Query + } + return "" +} + var File_types_v1_logquery_proto protoreflect.FileDescriptor var file_types_v1_logquery_proto_rawDesc = []byte{ @@ -81,23 +89,24 @@ var file_types_v1_logquery_proto_rawDesc = []byte{ 0x65, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x66, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x51, 0x75, 0x65, 0x72, 0x79, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x7c, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x2e, 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x12, 0x2a, 0x0a, 0x02, 0x74, 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x02, 0x74, 0x6f, 0x42, 0x8d, 0x01, 0x0a, - 0x0c, 0x63, 0x6f, 0x6d, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x42, 0x0d, 0x4c, - 0x6f, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2d, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x75, 0x6d, 0x61, 0x6e, - 0x6c, 0x6f, 0x67, 0x69, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x6f, 0x2f, 0x74, 0x79, 0x70, - 0x65, 0x73, 0x2f, 0x76, 0x31, 0x3b, 0x74, 0x79, 0x70, 0x65, 0x73, 0x76, 0x31, 0xa2, 0x02, 0x03, - 0x54, 0x58, 0x58, 0xaa, 0x02, 0x08, 0x54, 0x79, 0x70, 0x65, 0x73, 0x2e, 0x56, 0x31, 0xca, 0x02, - 0x08, 0x54, 0x79, 0x70, 0x65, 0x73, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x14, 0x54, 0x79, 0x70, 0x65, - 0x73, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, - 0xea, 0x02, 0x09, 0x54, 0x79, 0x70, 0x65, 0x73, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x02, 0x74, 0x6f, 0x12, 0x14, 0x0a, 0x05, + 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x64, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, + 0x72, 0x79, 0x42, 0x8d, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, 0x74, 0x79, 0x70, 0x65, 0x73, + 0x2e, 0x76, 0x31, 0x42, 0x0d, 0x4c, 0x6f, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x6f, + 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x68, 0x75, 0x6d, 0x61, 0x6e, 0x6c, 0x6f, 0x67, 0x69, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, + 0x67, 0x6f, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2f, 0x76, 0x31, 0x3b, 0x74, 0x79, 0x70, 0x65, + 0x73, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x54, 0x58, 0x58, 0xaa, 0x02, 0x08, 0x54, 0x79, 0x70, 0x65, + 0x73, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x08, 0x54, 0x79, 0x70, 0x65, 0x73, 0x5c, 0x56, 0x31, 0xe2, + 0x02, 0x14, 0x54, 0x79, 0x70, 0x65, 0x73, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x54, 0x79, 0x70, 0x65, 0x73, 0x3a, 0x3a, + 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/vendor/github.com/teivah/broadcast/.gitignore b/vendor/github.com/teivah/broadcast/.gitignore new file mode 100644 index 00000000..f8c0deda --- /dev/null +++ b/vendor/github.com/teivah/broadcast/.gitignore @@ -0,0 +1,3 @@ +.idea +broadcast.iml +coverage.out \ No newline at end of file diff --git a/vendor/github.com/teivah/broadcast/LICENSE b/vendor/github.com/teivah/broadcast/LICENSE new file mode 100644 index 00000000..261eeb9e --- /dev/null +++ b/vendor/github.com/teivah/broadcast/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/teivah/broadcast/README.md b/vendor/github.com/teivah/broadcast/README.md new file mode 100644 index 00000000..aaf9e550 --- /dev/null +++ b/vendor/github.com/teivah/broadcast/README.md @@ -0,0 +1,101 @@ +# broadcast + +![CI](https://github.com/teivah/broadcast/actions/workflows/ci.yml/badge.svg) +[![Go Report Card](https://goreportcard.com/badge/github.com/teivah/broadcast)](https://goreportcard.com/report/github.com/teivah/broadcast) + +Notification broadcaster in Go + +## What? + +`broadcast` is a library that allows sending repeated notifications to multiple goroutines with guaranteed delivery and user defined types. + +## Why? + +### Why not Channels? + +The standard way to handle notifications is via a `chan struct{}`. However, sending a message to a channel is received by a single goroutine. + +The only operation that is broadcast to multiple goroutines is a channel closure. Yet, if the channel is closed, there's no way to send a message again. + +❌ Repeated notifications to multiple goroutines + +✅ Guaranteed delivery + +### Why not sync.Cond? + +`sync.Cond` is the standard solution based on condition variables to set up containers of goroutines waiting for a specific condition. + +There's one caveat to keep in mind, though: the `Broadcast()` method doesn't guarantee that a goroutine will receive the notification. Indeed, the notification will be lost if the listener goroutine isn't waiting on the `Wait()` method. + +✅ Repeated notifications to multiple goroutines + +❌ Guaranteed delivery + +## How? + +### Step by Step + +First, we need to create a `Relay` for a message type (empty struct in this case): + +```go +relay := broadcast.NewRelay[struct{}]() +``` + +Once a `Relay` is created, we can create a new listener using the `Listener` method. As the `broadcast` library relies internally on channels, it accepts a capacity: + +````go +list := relay.Listener(1) // Create a new listener based on a channel with a one capacity +```` + +A `Relay` can send a notification in three different manners: +* `Notify`: block until a notification is sent to all the listeners +* `NotifyCtx`: send a notification to all listeners unless the provided context times out or is canceled +* `Broadcast`: send a notification to all listeners in a non-blocking manner; delivery isn't guaranteed + +On the `Listener` side, we can access the internal channel using `Ch`: + +```go +<-list.Ch() // Wait on a notification +``` + +We can close a `Listener` and a `Relay` using `Close`: + +```go +list.Close() +relay.Close() +``` + +Closing a `Relay` and `Listener`s can be done concurrently in a safe manner. + +### Example + +```go +type msg string +const ( + msgA msg = "A" + msgB = "B" + msgC = "C" +) + +relay := broadcast.NewRelay[msg]() // Create a relay for msg values +defer relay.Close() + +// Listener goroutines +for i := 0; i < 2; i++ { + go func(i int) { + l := relay.Listener(1) // Create a listener with a buffer capacity of 1 + for n := range l.Ch() { // Ranges over notifications + fmt.Printf("listener %d has received a notification: %v\n", i, n) + } + }(i) +} + +// Notifiers +time.Sleep(time.Second) +relay.Notify(msgA) // Send notification with guaranteed delivery +ctx, _ := context.WithTimeout(context.Background(), 0) // Context with immediate timeout +relay.NotifyCtx(ctx, msgB) // Send notification respecting context cancellation +time.Sleep(time.Second) // Allow time for previous messages to be processed +relay.Broadcast(msgC) // Send notification without guaranteed delivery +time.Sleep(time.Second) // Allow time for previous messages to be processed +``` \ No newline at end of file diff --git a/vendor/github.com/teivah/broadcast/broadcast.go b/vendor/github.com/teivah/broadcast/broadcast.go new file mode 100644 index 00000000..b1f49b82 --- /dev/null +++ b/vendor/github.com/teivah/broadcast/broadcast.go @@ -0,0 +1,123 @@ +// Package broadcast allows to send repeated notifications to multiple goroutines. +package broadcast + +import ( + "context" + "sync" +) + +// Relay is the struct in charge of handling the listeners and dispatching the notifications. +type Relay[T any] struct { + mu sync.RWMutex + n uint32 + clients map[uint32]*Listener[T] +} + +// NewRelay is the factory to create a Relay. +func NewRelay[T any]() *Relay[T] { + return &Relay[T]{ + clients: make(map[uint32]*Listener[T]), + } +} + +// Notify sends a notification to all the listeners. +// It guarantees that all the listeners will receive the notification. +func (r *Relay[T]) Notify(v T) { + r.mu.RLock() + defer r.mu.RUnlock() + + for _, client := range r.clients { + client.ch <- v + } +} + +// NotifyCtx tries sending a notification to all the listeners until the context times out or is canceled. +func (r *Relay[T]) NotifyCtx(ctx context.Context, v T) { + r.mu.RLock() + defer r.mu.RUnlock() + + for _, client := range r.clients { + select { + case client.ch <- v: + case <-ctx.Done(): + return + } + } +} + +// Broadcast broadcasts a notification to all the listeners. +// The notification is sent in a non-blocking manner, so there's no guarantee that a listener receives it. +func (r *Relay[T]) Broadcast(v T) { + r.mu.RLock() + defer r.mu.RUnlock() + + for _, client := range r.clients { + select { + case client.ch <- v: + default: + } + } +} + +// Listener creates a new listener given a channel capacity. +func (r *Relay[T]) Listener(capacity int) *Listener[T] { + r.mu.Lock() + defer r.mu.Unlock() + + listener := &Listener[T]{ + ch: make(chan T, capacity), + id: r.n, + relay: r, + } + r.clients[r.n] = listener + r.n++ + return listener +} + +// Close closes a relay. +// This operation can be safely called in the meantime as Listener.Close() +func (r *Relay[T]) Close() { + r.mu.Lock() + defer r.mu.Unlock() + + for _, client := range r.clients { + r.closeRelay(client) + } + r.clients = nil +} + +func (r *Relay[T]) closeRelay(l *Listener[T]) { + l.once.Do(func() { + close(l.ch) + delete(r.clients, l.id) + }) +} + +func (r *Relay[T]) closeListener(l *Listener[T]) { + r.mu.Lock() + defer r.mu.Unlock() + + close(l.ch) + delete(r.clients, l.id) +} + +// Listener is a Relay listener. +type Listener[T any] struct { + ch chan T + id uint32 + relay *Relay[T] + once sync.Once +} + +// Ch returns the Listener channel. +func (l *Listener[T]) Ch() <-chan T { + return l.ch +} + +// Close closes a listener. +// This operation can be safely called in the meantime as Relay.Close() +func (l *Listener[T]) Close() { + l.once.Do(func() { + l.relay.closeListener(l) + }) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index abf1b5d0..37bc8622 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -124,7 +124,7 @@ github.com/google/uuid # github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c ## explicit github.com/gsterjov/go-libsecret -# github.com/humanlogio/api/go v0.0.0-20241011070935-7bb04da206c8 +# github.com/humanlogio/api/go v0.0.0-20241011070935-7bb04da206c8 => /Users/antoine/code/src/github.com/humanlogio/api/go ## explicit; go 1.19 github.com/humanlogio/api/go/pkg/lang github.com/humanlogio/api/go/svc/account/v1 @@ -212,6 +212,9 @@ github.com/stretchr/testify/require # github.com/tadvi/systray v0.0.0-20190226123456-11a2b8fa57af ## explicit github.com/tadvi/systray +# github.com/teivah/broadcast v0.1.0 +## explicit; go 1.18 +github.com/teivah/broadcast # github.com/urfave/cli v1.22.14 ## explicit; go 1.11 github.com/urfave/cli @@ -304,3 +307,4 @@ google.golang.org/protobuf/types/known/timestamppb # gopkg.in/yaml.v3 v3.0.1 ## explicit gopkg.in/yaml.v3 +# github.com/humanlogio/api/go => /Users/antoine/code/src/github.com/humanlogio/api/go