Skip to content

Commit

Permalink
fix(localstorage): sinks may need a context to start
Browse files Browse the repository at this point in the history
  • Loading branch information
aybabtme committed Oct 28, 2024
1 parent 739039b commit 01663db
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 6 deletions.
2 changes: 1 addition & 1 deletion cmd/humanlog/localhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func startLocalhostServer(
}, nil
}
storage := localstorage.NewMemStorage(ll.WithGroup("memstorage"))
ownSink, _, err := storage.SinkFor(int64(machineID), time.Now().UnixNano())
ownSink, _, err := storage.SinkFor(ctx, int64(machineID), time.Now().UnixNano())
if err != nil {
return nil, nil, fmt.Errorf("can't create own sink: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/localstorage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (str *MemStorage) Heartbeat(ctx context.Context, machineID, sessionID int64
return str.heartbeat, nil
}

func (str *MemStorage) SinkFor(machineID, sessionID int64) (sink.Sink, time.Duration, error) {
func (str *MemStorage) SinkFor(ctx context.Context, machineID, sessionID int64) (sink.Sink, time.Duration, error) {
str.sinksMu.Lock()
defer str.sinksMu.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion internal/localstorage/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestMemoryStorage(t *testing.T) {
mem := NewMemStorage(slog.New(slog.NewTextHandler(os.Stderr, nil)))

for _, leg := range tt.input {
snk, _, err := mem.SinkFor(leg.MachineId, leg.SessionId)
snk, _, err := mem.SinkFor(ctx, leg.MachineId, leg.SessionId)
require.NoError(t, err)
for _, ev := range leg.Logs {
err = snk.Receive(ctx, ev)
Expand Down
2 changes: 1 addition & 1 deletion internal/localstorage/queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

type Storage interface {
Queryable
SinkFor(machineID, sessionID int64) (_ sink.Sink, heartbeatIn time.Duration, _ error)
SinkFor(ctx context.Context, machineID, sessionID int64) (_ sink.Sink, heartbeatIn time.Duration, _ error)
Heartbeat(ctx context.Context, machineID, sessionID int64) (time.Duration, error)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/localsvc/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (svc *Service) IngestStream(ctx context.Context, req *connect.ClientStream[
slog.Int64("session_id", sessionID),
)
ll.DebugContext(ctx, "receiving data from stream")
snk, heartbeatIn, err := svc.storage.SinkFor(machineID, sessionID)
snk, heartbeatIn, err := svc.storage.SinkFor(ctx, machineID, sessionID)
if err != nil {
ll.ErrorContext(ctx, "obtaining sink for stream", slog.Any("err", err))
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("obtaining sink for stream: %v", err))
Expand Down
2 changes: 1 addition & 1 deletion internal/localsvc/svc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestSummarize(t *testing.T) {
mem := localstorage.NewMemStorage(ll)

for _, leg := range tt.input {
snk, _, err := mem.SinkFor(leg.MachineId, leg.SessionId)
snk, _, err := mem.SinkFor(ctx, leg.MachineId, leg.SessionId)
require.NoError(t, err)
for _, ev := range leg.Logs {
err = snk.Receive(ctx, ev)
Expand Down

0 comments on commit 01663db

Please sign in to comment.