Skip to content

Commit

Permalink
fix(localhost): watching an unbounded query now waits for any future …
Browse files Browse the repository at this point in the history
…data
  • Loading branch information
aybabtme committed Oct 27, 2024
1 parent 5401459 commit 6607e05
Show file tree
Hide file tree
Showing 31 changed files with 905 additions and 195 deletions.
22 changes: 11 additions & 11 deletions cmd/humanlog/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func accountCmd(
getState func(cctx *cli.Context) *state.State,
getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource,
getAPIUrl func(cctx *cli.Context) string,
getHTTPClient func(*cli.Context) *http.Client,
getHTTPClient func(cctx *cli.Context, apiURL string) *http.Client,
) cli.Command {
return cli.Command{
Hidden: hideUnreleasedFeatures == "true",
Expand All @@ -42,7 +42,7 @@ func accountCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)
_, err := ensureLoggedIn(ctx, cctx, state, tokenSource, apiURL, httpClient)
if err != nil {
return err
Expand All @@ -58,7 +58,7 @@ func accountCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

accountName := cctx.Args().First()
if accountName == "" {
Expand All @@ -85,7 +85,7 @@ func accountCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

orgID, err := ensureOrgSelected(ctx, ll, cctx, state, tokenSource, apiURL, httpClient)
if err != nil {
Expand Down Expand Up @@ -124,7 +124,7 @@ func accountCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

orgID, err := ensureOrgSelected(ctx, ll, cctx, state, tokenSource, apiURL, httpClient)
if err != nil {
Expand Down Expand Up @@ -157,7 +157,7 @@ func accountCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

accountName := cctx.Args().First()
if accountName == "" {
Expand Down Expand Up @@ -198,7 +198,7 @@ func accountCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)
clOpts := connect.WithInterceptors(
auth.Interceptors(ll, tokenSource)...,
)
Expand Down Expand Up @@ -232,7 +232,7 @@ func accountCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

orgID, err := ensureOrgSelected(ctx, ll, cctx, state, tokenSource, apiURL, httpClient)
if err != nil {
Expand Down Expand Up @@ -283,7 +283,7 @@ func accountCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

orgID, err := ensureOrgSelected(ctx, ll, cctx, state, tokenSource, apiURL, httpClient)
if err != nil {
Expand Down Expand Up @@ -334,7 +334,7 @@ func accountCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

orgID, err := ensureOrgSelected(ctx, ll, cctx, state, tokenSource, apiURL, httpClient)
if err != nil {
Expand Down Expand Up @@ -396,7 +396,7 @@ func accountCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

orgID, err := ensureOrgSelected(ctx, ll, cctx, state, tokenSource, apiURL, httpClient)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions cmd/humanlog/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func authCmd(
getState func(cctx *cli.Context) *state.State,
getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource,
getAPIUrl func(cctx *cli.Context) string,
getHTTPClient func(cctx *cli.Context) *http.Client,
getHTTPClient func(cctx *cli.Context, apiURL string) *http.Client,
) cli.Command {
return cli.Command{
Name: authCmdName,
Expand All @@ -42,7 +42,7 @@ func authCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)
authClient := authv1connect.NewAuthServiceClient(httpClient, apiURL)
_, err := performLoginFlow(ctx, state, authClient, tokenSource)
return err
Expand All @@ -54,7 +54,7 @@ func authCmd(
ctx := getCtx(cctx)
apiURL := getAPIUrl(cctx)
state := getState(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)
tokenSource := getTokenSource(cctx)
userToken, err := ensureLoggedIn(ctx, cctx, state, tokenSource, apiURL, httpClient)
if err != nil {
Expand Down Expand Up @@ -89,7 +89,7 @@ func authCmd(
Action: func(cctx *cli.Context) error {
ctx := getCtx(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)
tokenSource := getTokenSource(cctx)

ll := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{}))
Expand Down
10 changes: 5 additions & 5 deletions cmd/humanlog/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ func ingest(
ctx context.Context,
ll *slog.Logger,
cctx *cli.Context,
apiAddr string,
apiURL string,
getCfg func(*cli.Context) *config.Config,
getState func(*cli.Context) *state.State,
getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource,
getHTTPClient func(*cli.Context) *http.Client,
getHTTPClient func(*cli.Context, string) *http.Client,
notifyUnableToIngest func(error),
) (sink.Sink, error) {
state := getState(cctx)
tokenSource := getTokenSource(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

if state.IngestionToken == nil || time.Now().After(state.IngestionToken.ExpiresAt.AsTime()) {
// we need to create an account token
accountToken, err := createIngestionToken(ctx, ll, cctx, state, tokenSource, apiAddr, httpClient)
accountToken, err := createIngestionToken(ctx, ll, cctx, state, tokenSource, apiURL, httpClient)
if err != nil {
return nil, fmt.Errorf("no ingestion token configured, and couldn't generate one: %v", err)
}
Expand All @@ -62,7 +62,7 @@ func ingest(
connect.WithGRPC(),
}

client := ingestv1connect.NewIngestServiceClient(httpClient, apiAddr, clOpts...)
client := ingestv1connect.NewIngestServiceClient(httpClient, apiURL, clOpts...)
var snk sink.Sink
switch sinkType := os.Getenv("HUMANLOG_SINK_TYPE"); sinkType {
case "unary":
Expand Down
8 changes: 4 additions & 4 deletions cmd/humanlog/localhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ func startLocalhostServer(
localhostSink := logsvcsink.StartStreamSink(ctx, ll, client, "local", machineID, 1<<20, 100*time.Millisecond, true, notifyUnableToIngest)
return localhostSink, func(ctx context.Context) error {
logdebug("flushing localhost sink")
return localhostSink.Flush(ctx)
return localhostSink.Close(ctx)
}, nil
}
storage := localstorage.NewMemStorage()
storage := localstorage.NewMemStorage(ll.WithGroup("memstorage"))
ownSink, _, err := storage.SinkFor(int64(machineID), time.Now().UnixNano())
if err != nil {
return nil, nil, fmt.Errorf("can't create own sink: %v", err)
Expand Down Expand Up @@ -127,11 +127,11 @@ func startLocalhostServer(
wg.Add(1)
go func() {
defer wg.Done()
errc <- ownSink.Flush(ctx)
errc <- ownSink.Close(ctx)
}()
wg.Wait()
close(errc)
l.Close()
_ = l.Close()
var ferr error
for err := range errc {
if ferr == nil {
Expand Down
6 changes: 3 additions & 3 deletions cmd/humanlog/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func machineCmd(
getState func(cctx *cli.Context) *state.State,
getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource,
getAPIUrl func(cctx *cli.Context) string,
getHTTPClient func(*cli.Context) *http.Client,
getHTTPClient func(cctx *cli.Context, apiURL string) *http.Client,
) cli.Command {
return cli.Command{
Hidden: hideUnreleasedFeatures == "true",
Expand All @@ -34,7 +34,7 @@ func machineCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)
_, err := ensureLoggedIn(ctx, cctx, state, tokenSource, apiURL, httpClient)
if err != nil {
return err
Expand All @@ -51,7 +51,7 @@ func machineCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)
accountToken, err := createIngestionToken(ctx, ll, cctx, state, tokenSource, apiURL, httpClient)
if err != nil {
return fmt.Errorf("ingestion token couldn't be generated: %v", err)
Expand Down
26 changes: 13 additions & 13 deletions cmd/humanlog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func newApp() *cli.App {
logdebug("using basesite at %q", baseSiteURL)
return baseSiteURL
}
getHTTPClient = func(cctx *cli.Context) *http.Client {
getHTTPClient = func(cctx *cli.Context, apiURL string) *http.Client {
u, _ := url.Parse(apiURL)
if host, _, _ := net.SplitHostPort(u.Host); host == "localhost" {
getLogger(cctx).Debug("using localhost client")
Expand Down Expand Up @@ -507,7 +507,7 @@ func newApp() *cli.App {
ctx, cancel := context.WithTimeout(context.Background(), flushTimeout)
defer cancel()
ll.DebugContext(ctx, "flushing remote ingestion sink for up to 300ms")
if err := remotesink.Flush(ctx); err != nil {
if err := remotesink.Close(ctx); err != nil {
ll.ErrorContext(ctx, "couldn't flush buffered log", slog.Any("err", err))
} else {
ll.DebugContext(ctx, "done sending all logs")
Expand All @@ -528,7 +528,7 @@ func newApp() *cli.App {
// no machine ID assigned, ensure machine gets onboarded via the login flow
// TODO(antoine): if an account token exists, auto-onboard the machine. it's probably
// not an interactive session
_, err := ensureLoggedIn(ctx, cctx, state, getTokenSource(cctx), apiURL, getHTTPClient(cctx))
_, err := ensureLoggedIn(ctx, cctx, state, getTokenSource(cctx), apiURL, getHTTPClient(cctx, apiURL))
if err != nil {
return fmt.Errorf("this feature requires a valid machine ID, which requires an account. failed to login: %v", err)
}
Expand All @@ -539,17 +539,17 @@ func newApp() *cli.App {
loginfo("starting experimental localhost service: %v", err)
} else {
sink = teesink.NewTeeSink(sink, localhostSink)
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel()
ll.DebugContext(ctx, "flushing localhost ingestion sink for up to 300ms")
if err := done(ctx); err != nil {
ll.ErrorContext(ctx, "couldn't flush buffered log (localhost)", slog.Any("err", err))
} else {
ll.DebugContext(ctx, "done sending all logs")
}
}()
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel()
ll.DebugContext(ctx, "flushing localhost ingestion sink for up to 300ms")
if err := done(ctx); err != nil {
ll.ErrorContext(ctx, "couldn't flush buffered log (localhost)", slog.Any("err", err))
} else {
ll.DebugContext(ctx, "done sending all logs")
}
}()
}
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/humanlog/onboarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func onboardingCmd(
getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource,
getAPIUrl func(cctx *cli.Context) string,
getBaseSiteURL func(cctx *cli.Context) string,
getHTTPClient func(*cli.Context) *http.Client,
getHTTPClient func(*cli.Context, string) *http.Client,
) cli.Command {
return cli.Command{
Name: onboardingCmdName,
Expand All @@ -47,7 +47,7 @@ func onboardingCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)
authClient := authv1connect.NewAuthServiceClient(httpClient, apiURL)
_, err := performLoginFlow(ctx, state, authClient, tokenSource)
return err
Expand Down
22 changes: 11 additions & 11 deletions cmd/humanlog/organization.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func organizationCmd(
getState func(cctx *cli.Context) *state.State,
getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource,
getAPIUrl func(cctx *cli.Context) string,
getHTTPClient func(*cli.Context) *http.Client,
getHTTPClient func(cctx *cli.Context, apiURL string) *http.Client,
) cli.Command {

var (
Expand All @@ -49,7 +49,7 @@ func organizationCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)
_, err := ensureLoggedIn(ctx, cctx, state, tokenSource, apiURL, httpClient)
if err != nil {
return err
Expand All @@ -67,7 +67,7 @@ func organizationCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

organizationName := cctx.Args().First()
if organizationName == "" {
Expand Down Expand Up @@ -106,7 +106,7 @@ func organizationCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

if state.CurrentOrgID == nil {
return fmt.Errorf("no org is currently set")
Expand Down Expand Up @@ -146,7 +146,7 @@ func organizationCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

clOpts := connect.WithInterceptors(
auth.Interceptors(ll, tokenSource)...,
Expand All @@ -169,7 +169,7 @@ func organizationCmd(
ll := getLogger(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

clOpts := connect.WithInterceptors(
auth.Interceptors(ll, tokenSource)...,
Expand Down Expand Up @@ -211,7 +211,7 @@ func organizationCmd(
ll := getLogger(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

organizationName := cctx.Args().First()
if organizationName == "" {
Expand Down Expand Up @@ -247,7 +247,7 @@ func organizationCmd(
ll := getLogger(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

clOpts := connect.WithInterceptors(
auth.Interceptors(ll, tokenSource)...,
Expand Down Expand Up @@ -277,7 +277,7 @@ func organizationCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

orgID, err := ensureOrgSelected(ctx, ll, cctx, state, tokenSource, apiURL, httpClient)
if err != nil {
Expand Down Expand Up @@ -310,7 +310,7 @@ func organizationCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

_ = ctx
_ = state
Expand All @@ -335,7 +335,7 @@ func organizationCmd(
state := getState(cctx)
tokenSource := getTokenSource(cctx)
apiURL := getAPIUrl(cctx)
httpClient := getHTTPClient(cctx)
httpClient := getHTTPClient(cctx, apiURL)

_ = ctx
_ = state
Expand Down
Loading

0 comments on commit 6607e05

Please sign in to comment.