Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service discovery #2882

Merged
merged 10 commits into from
Feb 10, 2025
Merged

service discovery #2882

merged 10 commits into from
Feb 10, 2025

Conversation

chronark
Copy link
Collaborator

@chronark chronark commented Feb 10, 2025

  • wip
  • wip
  • wip
  • wip

Summary by CodeRabbit

  • New Features

    • Introduced a maintenance task for managing Docker cleanup.
    • Released dedicated production and staging configuration files.
    • Provided configurable cluster discovery and logging options (e.g., color-enabled logs, new HTTP port 7070).
  • Refactor & Infrastructure

    • Streamlined container image and deployment settings.
    • Improved API routing through updated middleware integration.
    • Enhanced dependency management and testing coverage for smoother operations.

Copy link

changeset-bot bot commented Feb 10, 2025

⚠️ No Changeset found

Latest commit: 1bb02f0

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link

vercel bot commented Feb 10, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
dashboard 🛑 Canceled (Inspect) Feb 10, 2025 10:24am
engineering ✅ Ready (Inspect) Visit Preview 💬 Add feedback Feb 10, 2025 10:24am
play ✅ Ready (Inspect) Visit Preview 💬 Add feedback Feb 10, 2025 10:24am
www ✅ Ready (Inspect) Visit Preview 💬 Add feedback Feb 10, 2025 10:24am

Copy link
Contributor

coderabbitai bot commented Feb 10, 2025

Warning

Rate limit exceeded

@chronark has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 9 minutes and 53 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between f5f0f30 and 1bb02f0.

📒 Files selected for processing (2)
  • go/pkg/buffer/buffer.go (1 hunks)
  • go/pkg/membership/memberlist.go (1 hunks)
📝 Walkthrough

Walkthrough

The pull request introduces widespread changes across the codebase. New tasks and command targets have been added to both Taskfile.yml files, while Dockerfiles, deployment configurations, and JSON configuration files have been updated. Significant modifications include enhancements to logging (with color support), API configuration restructuring, route registration with explicit middleware, and revised error handling. New packages for cluster management, membership, and consistent hashing (ring) have been introduced, along with extensive dependency updates and removal of legacy certificate, Redis, and cache components. Additionally, protocol buffer definitions and the JSON schema have been revised for improved configuration support.

Changes

File(s) Change Summary
Taskfiles
Taskfile.yml, go/Taskfile.yml
Added new tasks: nuke-docker, generate, and simulate; formatting adjustments and command simplification.
Docker & Deployment
apps/agent/Dockerfile, go/Dockerfile, deployment/docker-compose.yaml
Changed base image and added config files in apps/agent; added ENTRYPOINT in go/Dockerfile; removed api_lb service and updated port mappings/deployment in docker-compose.
API & Logging Setup
apps/agent/cmd/agent/setup.go, apps/agent/pkg/config/agent.go, apps/agent/pkg/logging/logger.go, apps/agent/pkg/api/validation/validator.go
Enhanced logger initialization with configuration for color output; added a Color field; removed a debug print from the validator.
API Configuration & Routes
go/cmd/api/config.go, go/cmd/api/main.go, go/cmd/api/routes/*, go/cmd/api/routes/v2_ecs_meta/handler.go
Refactored node configuration (removing NodeId/Port, adding HttpPort and nested Cluster/Discovery/Logs); updated command signature and cluster initialization; enhanced route registration with multiple middleware (e.g. WithMetrics, WithRootKeyAuth) and introduced a new ECS metadata route.
Configuration Files & Dependencies
go/config.docker.json, go/go.mod, go/main.go
Removed obsolete fields and updated addresses in config; introduced new dependencies and updated versions; simplified error handling in main.
Rate Limiting
go/internal/services/ratelimit/*
Renamed bucket method to getOrCreateBucket, removed the peer management file, introduced new peer handling, revised replay functionality (renaming to replayRequests and adding replayToOrigin), and implemented sliding window enhancements with new configuration and SetWindows method.
Assertions & Buffering
go/pkg/assert/assert.go, go/pkg/buffer/buffer.go, go/pkg/buffer/buffer_test.go
Modified assertion method to accept variadic messages; introduced a generic buffer package with methods for buffering and consuming, supplemented by comprehensive tests.
Cache & Certificate
go/pkg/cache/middleware/metrics.go, go/pkg/certificate/*
Removed files related to cache metrics and certificate management (ACM, autocert, and the Source interface).
Circuit Breaker
go/pkg/circuitbreaker/*
Updated import paths and logger initialization (to a noop logger); removed metrics file; adjusted test instantiation.
ClickHouse Schema
go/pkg/clickhouse/schema/requests.go
Added a new WorkspaceID field to the API request schema.
Cluster & Membership
go/pkg/cluster/*, go/pkg/membership/*
Introduced new cluster packages (with Config, interface, and noop implementation) and overhauled membership management (new bus, member, memberlist, and tests), while removing legacy fake, Redis, and tags files and updating the Membership interface.
Database & Discovery
go/pkg/database/database.go, go/pkg/discovery/*
Added MySQL driver import; introduced a new Discoverer interface and a static discovery implementation.
Logging Adjustments
go/pkg/logging/slog.go
Adjusted tint handler options to disable source information in development mode.
Test Utilities & Zen Server
go/pkg/testutil/http.go, go/pkg/zen/*, go/pkg/zen/session.go
Updated harness for consistent naming and explicit middleware registration; streamlined Server by removing global middleware and updating session handling (adding workspaceID, removing RequestID, and introducing Send).
Protocol & Schema
go/proto/ratelimit/v1/service.proto, go/schema.json
Updated go_package option in proto; made the cost field required in RatelimitRequest; added a time field (and repositioned denied) in ReplayRequest; expanded JSON schema with new cluster, httpPort, logs, and redisUrl properties while removing nodeId and port.

Sequence Diagram(s)

sequenceDiagram
    participant C as Client
    participant S as API Server
    participant M as Middleware Chain
    participant H as v2EcsMeta Handler

    C->>S: GET /v2/__ecs_meta
    S->>M: Process request with middleware
    M->>H: Forward request to ECS metadata handler
    H->>H: Read ECS_CONTAINER_METADATA_URI<br>& Make HTTP GET to ECS endpoint
    H-->>S: Return metadata response (200 OK)
    S-->>C: Send back response
Loading
sequenceDiagram
    participant CLI as CLI Context
    participant A as API Server Initialization
    participant CL as Cluster Module
    participant R as Route Registration

    CLI->>A: Invoke run(cliC)
    A->>A: Load configuration (including cluster settings)
    A->>CL: Initialize cluster using NewNoop
    A->>R: Register API routes with explicit middleware
    A-->>CLI: Start server on configured HttpPort
Loading

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

github-actions bot commented Feb 10, 2025

Hey there and thank you for opening this pull request! 👋🏼

We require pull request titles to follow the Conventional Commits specification and it looks like your proposed title needs to be adjusted.
Here is an example:

<type>[optional scope]: <description>
fix: I fixed something for Unkey

Details:

No release type found in pull request title "service discovery". Add a prefix to indicate what kind of release this pull request corresponds to. For reference, see https://www.conventionalcommits.org/

Available types:
 - feat: A new feature
 - fix: A bug fix
 - docs: Documentation only changes
 - style: Changes that do not affect the meaning of the code (white-space, formatting, missing semi-colons, etc)
 - refactor: A code change that neither fixes a bug nor adds a feature
 - perf: A code change that improves performance
 - test: Adding missing tests or correcting existing tests
 - build: Changes that affect the build system or external dependencies (example scopes: gulp, broccoli, npm)
 - ci: Changes to our CI configuration files and scripts (example scopes: Travis, Circle, BrowserStack, SauceLabs)
 - chore: Other changes that don't modify src or test files
 - revert: Reverts a previous commit

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

🔭 Outside diff range comments (4)
go/pkg/zen/middleware_validate.go (1)

11-35: Improve bearer token parsing and error messages.

The bearer token parsing could be improved to be fully compliant with RFC 6750, and error messages could be more descriptive.

+const bearerPrefix = "Bearer "

 func WithRootKeyAuth(svc keys.KeyService) Middleware {
     return func(next HandleFunc) HandleFunc {
         return func(s *Session) error {
             header := s.r.Header.Get("Authorization")
             if header == "" {
-                return fault.New("empty authorization header", fault.WithTag(fault.UNAUTHORIZED))
+                return fault.New("missing Authorization header", fault.WithTag(fault.UNAUTHORIZED))
             }

-            bearer := strings.TrimSuffix(header, "Bearer ")
-            if bearer == "" {
-                return fault.New("invalid token", fault.WithTag(fault.UNAUTHORIZED))
+            if !strings.HasPrefix(header, bearerPrefix) {
+                return fault.New("invalid Authorization header format, expected 'Bearer <token>'", fault.WithTag(fault.UNAUTHORIZED))
             }
+            token := header[len(bearerPrefix):]
+            if token == "" {
+                return fault.New("missing token in Authorization header", fault.WithTag(fault.UNAUTHORIZED))
+            }

-            key, err := svc.Verify(s.Context(), hash.Sha256(bearer))
+            key, err := svc.Verify(s.Context(), hash.Sha256(token))
             if err != nil {
-                return fault.Wrap(err)
+                return fault.Wrap(err, fault.WithTag(fault.UNAUTHORIZED))
             }

             s.workspaceID = key.AuthorizedWorkspaceID

             return next(s)
         }
     }
 }
go/internal/services/ratelimit/bucket.go (1)

49-54: Consider adding bucket cleanup mechanism.

The bucket creation logic should be accompanied by a cleanup mechanism to prevent memory leaks from accumulated expired buckets.

 type service struct {
     // ... existing fields ...
+    cleanupInterval time.Duration
 }

+func (s *service) startBucketCleanup(ctx context.Context) {
+    ticker := time.NewTicker(s.cleanupInterval)
+    defer ticker.Stop()
+
+    for {
+        select {
+        case <-ctx.Done():
+            return
+        case <-ticker.C:
+            s.cleanupExpiredBuckets()
+        }
+    }
+}
+
+func (s *service) cleanupExpiredBuckets() {
+    now := time.Now()
+    s.bucketsLock.Lock()
+    defer s.bucketsLock.Unlock()
+
+    for key, bucket := range s.buckets {
+        bucket.mu.RLock()
+        if bucket.isExpired(now) {
+            delete(s.buckets, key)
+        }
+        bucket.mu.RUnlock()
+    }
+}
go/pkg/zen/session.go (1)

107-116: Update reset method to clear workspaceID.

The reset method should clear the newly added workspaceID field to prevent data leakage between requests.

 func (s *Session) reset() {
 	s.requestID = ""
 
 	s.w = nil
 	s.r = nil
 
+	s.workspaceID = ""
 	s.requestBody = nil
 	s.responseStatus = 0
 	s.responseBody = nil
 }
go/pkg/zen/server.go (1)

126-154: Replace panic with proper error handling.

Using panic for error handling is not recommended in production code. Consider returning errors instead.

 			sess, ok := s.getSession().(*Session)
 			if !ok {
-				panic("Unable to cast session")
+				s.logger.Error(context.Background(), "unable to cast session")
+				http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+				return
 			}

 			err := sess.Init(w, r)
 			if err != nil {
 				s.logger.Error(context.Background(), "failed to init session")
+				http.Error(w, "Internal Server Error", http.StatusInternalServerError)
 				return
 			}

 			// Apply middleware
 			var handle HandleFunc = route.Handle

 			// Reverses the middlewares to run in the desired order.
 			// If middlewares are [A, B, C], this writes [C, B, A] to s.middlewares.
 			for i := len(middlewares) - 1; i >= 0; i-- {
 				handle = middlewares[i](handle)
 			}

 			err = handle(sess)

 			if err != nil {
-				panic(err)
+				s.logger.Error(context.Background(), "handler error", slog.String("error", err.Error()))
+				http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+				return
 			}
🧹 Nitpick comments (51)
go/pkg/assert/assert.go (3)

30-31: Update function documentation to include the messages parameter.

The function comment should document the new variadic parameter and explain how multiple messages are handled.

-// True asserts that a boolean is true
+// True asserts that a boolean is true
+// Optional messages can be provided to override the default error message

31-37: Consider utilizing all provided messages.

Currently, only the first message is used when constructing the error. Consider joining all provided messages with a separator for more detailed error reporting.

 func True(value bool, messages ...string) error {
 	if !value {
 		if len(messages) == 0 {
 			messages = []string{"expected true but got false"}
 		}
-		return fault.New(messages[0], fault.WithTag(fault.ASSERTION_FAILED))
+		return fault.New(strings.Join(messages, ": "), fault.WithTag(fault.ASSERTION_FAILED))
 	}

31-39: Consider standardizing custom message support across all assertion functions.

The True function now supports custom messages, but other assertion functions like False, Equal, etc., don't have this capability. Consider extending this feature to all assertion functions for consistency.

Would you like me to help implement custom message support for the other assertion functions?

go/cmd/api/config.go (1)

14-28: Nested struct design for Cluster.
While anonymous structs are fine for scoping config, consider extracting them into named structs for better reusability, validation, and clarity—especially if these configs expand in the future.

go/internal/services/ratelimit/replay.go (5)

13-16: Document concurrency considerations for replayRequests.
This function is blocking and intended for a goroutine. Confirm you have a graceful shutdown or channel close mechanism to avoid blocking if the service stops.


23-23: Function naming clarity.
replayToOrigin is well-scoped, but consider naming it to reflect that it replays to the node that originally received the request—especially since it exits early if the current node is already that origin.


40-40: Elevate error severity?
Failure to create a peer client may be critical in certain scenarios. Consider using Error instead of Warn if it indicates a major operational issue.


48-48: Circuit breaker usage on replay.
This approach is good for controlling repeated failures. Ensure replayCircuitBreaker is sized/tuned appropriately for your throughput.


55-55: Granular error logging.
If multiple error causes are possible here (e.g., network vs. decoding errors), consider logging more details or employing typed error checks.

go/pkg/cluster/cluster.go (5)

14-20: Consider validating required fields in Config.

While RpcPort, Self, and Membership are essential for cluster operation, there's no validation to ensure they are properly configured. A basic check or defaulting behavior would help prevent runtime errors due to missing or invalid fields.


49-60: Ensure concurrency safety when accessing fields.

The cluster struct references self, membership, and ring from multiple goroutines. Even though ring and membership may be thread-safe internally, consider documenting concurrency expectations or adding synchronization (e.g., read locks) around shared fields if they can be mutated.


67-69: Potential unsubscribing mechanism.

The cluster exposes a subscription to join events but doesn’t document how callers should unsubscribe. If not handled, these event subscriptions can accumulate over a long runtime.


71-73: Duplicate consideration regarding event subscription lifecycle.

Same recommendation applies to the SubscribeLeave usage. Document the unsubscribe protocol or add a method to prevent subscriber buildup.


115-122: Optional: Pass context to ring operations.

The FindNode method receives a context parameter but doesn’t use it when calling r.ring.FindNode(key). While not harmful, consider whether you want to pass the context to further ring operations for consistent cancellation or logging.

go/pkg/membership/memberlist.go (5)

21-26: Validate Config fields.

Fields like NodeID and GossipPort are critical. Consider verifying they're set to valid values (e.g., non-empty NodeID, nonzero port) before using them in New.


28-38: Use consistent naming for membership struct or embed NodeID.

The membership struct includes self Member, but self is never used directly within this file. If usage in a future PR is expected, clarify or remove if dead code.


75-81: Document channel subscription lifecycle.

Like in the cluster package, unsubscribing from these topics isn’t addressed. If ephemeral subscriptions are created, they may accumulate.


83-89: Log server shutdown outcome.

Leave() gracefully leaves the cluster, then calls Shutdown(). Logging the outcome of Shutdown() would help confirm the success or failure of the final step.

 err := m.memberlist.Leave(time.Second * 15)
 if err != nil {
     return fmt.Errorf("Failed to leave serf: %w", err)
 }
+shutdownErr := m.memberlist.Shutdown()
+if shutdownErr != nil {
+    m.logger.Warn(context.Background(), "memberlist shutdown failed", slog.String("error", shutdownErr.Error()))
+}
-return m.memberlist.Shutdown()
+return shutdownErr

91-133: Clarify usage of retries for cluster join.

Using a retry loop is great for resilience, but ensure logging is sufficiently detailed to differentiate transient vs. permanent failures. Also confirm that 10 attempts is correct for your environment.

go/pkg/ring/ring.go (6)

18-29: Encourage immutability of Tags.

Tags can be a complex type, but the docstring advises copyable fields only. Reinforcing that guidance in your documentation or type constraints can prevent accidental pointer usage.


42-50: Store dedicated ring metadata if needed.

The ring maintains internal structures (nodes and tokens). If future expansions require node logs or usage stats, consider grouping them to keep ring logic separated from node data.


52-62: Validate config fields in New.

Similar to membership, confirm that TokensPerNode is above 0 and Logger is non-nil to avoid possible nil pointer usage.


93-109: Optionally log final ring size after removal.

After removing a node, logging the new count of tokens/nodes can help with debugging.


111-117: Assess security implications of MD5 usage.

While MD5 can be acceptable for consistent hashing (speed over cryptographic strength), confirm that collisions won't cause operational problems. Consider a faster non-cryptographic hash (e.g., xxHash) for performance.


132-160: Return more context with "node not found" errors.

It's unclear if the ring is truly missing the node or if there's a race. Consider including additional fields (e.g., key) in the error for debugging.

 if !ok {
-    return Node[T]{}, fmt.Errorf("node not found: %s", token.nodeID)
+    return Node[T]{}, fmt.Errorf("node not found: %s for key: %s", token.nodeID, key)
 }
go/cmd/api/main.go (1)

47-48: Consider reducing cognitive complexity
// nolint:gocognit indicates the function’s complexity is high. You might consider splitting your setup steps (cluster initialization, membership config, server setup, etc.) into smaller functions for readability and maintainability.

go/internal/services/ratelimit/sliding_window.go (1)

44-71: Concurrent replay handling
Spawning eight goroutines for replayRequests may handle high load well. However, ensure that concurrency is sized appropriately for your environment and that requests do not overwhelm other downstream systems.

go/pkg/discovery/static.go (1)

1-11: Simple static discovery
Returning a predefined slice of addresses satisfies the Discoverer interface in a straightforward way. Consider adding optional address validation to preempt errors from invalid or inaccessible peers.

go/pkg/discovery/interface.go (1)

3-8: Consider adding context parameter for cancellation support.

The Discover method could benefit from accepting a context parameter to support cancellation and timeouts during discovery operations.

 type Discoverer interface {
 	// Discover returns an slice of addresses to join.
 	//
 	// If an empty slice is returned, you should bootstrap the cluster.
-	Discover() ([]string, error)
+	Discover(ctx context.Context) ([]string, error)
 }
go/pkg/membership/member.go (1)

8-12: Consider adding validation for NodeID and Addr.

The Member struct could benefit from validation to ensure:

  1. NodeID is not empty
  2. Addr is a valid IP address

Consider adding a validation method:

func (m Member) Validate() error {
    if m.NodeID == "" {
        return fmt.Errorf("nodeId cannot be empty")
    }
    if m.Addr == nil || m.Addr.IsUnspecified() {
        return fmt.Errorf("addr must be a valid IP address")
    }
    return nil
}

Then call it during unmarshaling:

 func (m *Member) Unmarshal(b []byte) error {
-    return json.Unmarshal(b, m)
+    if err := json.Unmarshal(b, m); err != nil {
+        return err
+    }
+    return m.Validate()
 }
go/main.go (1)

27-27: Consider using structured logging for error output.

The current error handling loses the detailed error chain information. Consider using a structured logger to maintain error details while providing a clean output format.

-        fmt.Println(err.Error())
+        log.WithError(err).Error("application error")
go/pkg/cluster/interface.go (2)

8-12: Add documentation for Node struct fields.

Each field in the Node struct should be documented to explain its purpose and any constraints.

 type Node struct {
+    // ID is a unique identifier for the node
     ID      string
+    // RpcAddr is the address for RPC communication
     RpcAddr string
+    // Addr is the IP address of the node
     Addr    net.IP
 }

14-23: Enhance Cluster interface with additional functionality.

Consider the following improvements to the Cluster interface:

  1. Add method documentation
  2. Add unsubscribe capability
  3. Add methods to list nodes and get node status
 // Cluster abstracts away membership and consistent hashing.
 type Cluster interface {
+    // Self returns the current node
     Self() Node
 
+    // FindNode returns the node responsible for the given key
     FindNode(ctx context.Context, key string) (Node, error)
+    // Shutdown gracefully shuts down the cluster
     Shutdown(ctx context.Context) error
 
+    // SubscribeJoin returns a channel that receives notifications when nodes join
     SubscribeJoin() <-chan Node
+    // SubscribeLeave returns a channel that receives notifications when nodes leave
     SubscribeLeave() <-chan Node
+    // Unsubscribe cancels a subscription
+    Unsubscribe(ch <-chan Node)
+    // ListNodes returns all current nodes in the cluster
+    ListNodes(ctx context.Context) ([]Node, error)
+    // GetNodeStatus returns the status of a specific node
+    GetNodeStatus(ctx context.Context, nodeID string) (NodeStatus, error)
 }
+
+type NodeStatus struct {
+    State     string    // e.g., "healthy", "suspect", "dead"
+    LastSeen  time.Time
+    Metadata  map[string]string
+}
go/pkg/cluster/noop.go (2)

8-12: Add documentation for public types.

Add documentation comments for the noop type and its purpose as a no-op implementation of the Cluster interface.

+// noop provides a no-op implementation of the Cluster interface.
+// It always returns the self node and is useful for single-node deployments
+// or testing scenarios where cluster functionality is not needed.
 type noop struct {
     self Node
 }

34-40: Consider buffered channels for subscriptions.

The unbuffered channels returned by SubscribeJoin and SubscribeLeave could potentially block if not consumed. Consider using buffered channels or documenting that consumers must actively read from these channels.

 func (n *noop) SubscribeJoin() <-chan Node {
-    return make(chan Node)
+    ch := make(chan Node, 1)
+    close(ch) // Close immediately as no events will be sent
+    return ch
 }

 func (n *noop) SubscribeLeave() <-chan Node {
-    return make(chan Node)
+    ch := make(chan Node, 1)
+    close(ch) // Close immediately as no events will be sent
+    return ch
 }
go/pkg/membership/bus.go (1)

20-23: Consider accepting context from the caller instead of using background context.

Using context.Background() might not be ideal as it prevents propagation of deadlines, cancellation signals, and trace information from the caller.

Consider modifying the notification methods to accept a context parameter:

-func (b *bus) NotifyJoin(node *memberlist.Node) {
+func (b *bus) NotifyJoin(ctx context.Context, node *memberlist.Node) {
-       b.onJoin.Emit(context.Background(), Member{
+       b.onJoin.Emit(ctx, Member{
                NodeID: node.Name,
                Addr:   node.Addr,
        })
}

-func (b *bus) NotifyLeave(node *memberlist.Node) {
+func (b *bus) NotifyLeave(ctx context.Context, node *memberlist.Node) {
-       b.onLeave.Emit(context.Background(), Member{
+       b.onLeave.Emit(ctx, Member{
                NodeID: node.Name,
                Addr:   node.Addr,
        })
}

-func (b *bus) NotifyUpdate(node *memberlist.Node) {
+func (b *bus) NotifyUpdate(ctx context.Context, node *memberlist.Node) {
-       b.onUpdate.Emit(context.Background(), Member{
+       b.onUpdate.Emit(ctx, Member{
                NodeID: node.Name,
                Addr:   node.Addr,
        })
}

Also applies to: 30-33, 41-44

go/cmd/api/routes/register.go (1)

27-41: Consider extracting middleware sets into constants.

To improve maintainability and reusability, consider extracting the middleware combinations into named constants.

+var (
+       livenessMiddlewares = []zen.Middleware{
+               withMetrics,
+               withLogging,
+               withErrorHandling,
+               withValidation,
+       }
+)

-       srv.RegisterRoute(
-               []zen.Middleware{
-                       withMetrics,
-                       withLogging,
-                       withErrorHandling,
-                       withValidation,
-               },
-               v2Liveness.New())
+       srv.RegisterRoute(livenessMiddlewares, v2Liveness.New())
apps/agent/cmd/agent/setup.go (1)

32-34: Verify color settings persistence with Axiom writer.

When Axiom logging is enabled, the color configuration is lost as a new logger is created without preserving the color settings.

Consider preserving the color settings when creating the Axiom logger:

 logger = logging.New(&logging.Config{
-    Writer: []io.Writer{ax},
+    Writer: []io.Writer{ax},
+    Color:  cfg.Logging.Color,
 })
go/pkg/testutil/http.go (1)

44-44: Consider using a pre-allocated empty middleware slice.

Creating an empty slice for each route registration is unnecessary.

Consider defining a package-level constant:

+var noMiddleware = make([]zen.Middleware, 0)
+
 func (h *Harness) Register(route zen.Route) {
-    h.srv.RegisterRoute([]zen.Middleware{}, route)
+    h.srv.RegisterRoute(noMiddleware, route)
 }
go/internal/services/ratelimit/peers.go (2)

25-33: Consider adding error handling for channel closure.

The infinite loop reading from the channel should handle graceful shutdown when the channel is closed.

 func (s *service) syncPeers() {
+    defer s.logger.Info(context.Background(), "stopping peer sync")
     for leave := range s.cluster.SubscribeLeave() {
+        if leave == nil {
+            return // channel closed
+        }
         s.logger.Info(context.Background(), "peer left", slog.String("peer", leave.ID))
         s.peerMu.Lock()
         delete(s.peers, leave.ID)
         s.peerMu.Unlock()
     }
 }

36-54: Consider adding context parameter for better control.

The getPeer method creates a background context which could lead to orphaned operations. Consider accepting a context parameter.

-func (s *service) getPeer(key string) (peer, error) {
+func (s *service) getPeer(ctx context.Context, key string) (peer, error) {
     s.peerMu.RLock()
     p, ok := s.peers[key]
     s.peerMu.RUnlock()
     if ok {
         return p, nil
     }
-    p, err := s.newPeer(context.Background(), key)
+    p, err := s.newPeer(ctx, key)
go/pkg/ring/ring_test.go (3)

18-32: Consider parameterizing edge cases in test table.

The test table could benefit from additional edge cases such as minimum capacity (1), zero capacity, and very large capacity values to ensure robust behavior across the full range of possible inputs.

 		{
+			name:     "minimum_capacity",
+			capacity: 1,
+			drop:     false,
+		},
+		{
+			name:     "zero_capacity",
+			capacity: 0,
+			drop:     false,
+		},
+		{
+			name:     "large_capacity",
+			capacity: 1_000_000,
+			drop:     false,
+		},
+		{
 			name:     "with_drop_enabled",
 			capacity: 10_000,
 			drop:     true,
 		},

76-76: Consider adding assertions for the actual values.

The test only verifies the count of received elements but doesn't validate their order or values. Consider adding assertions to ensure the received elements match the expected values in the correct order.

 			assert.Equal(t, tt.wantLen, len(received), "received elements count should match expected")
+			// Verify the actual values and order if drop is false
+			if !tt.drop && len(received) == len(tt.input) {
+				assert.Equal(t, tt.input, received, "received elements should match input in order")
+			}

Also applies to: 94-94


99-121: Consider adding timeout as a test parameter.

The blocking behavior test uses a hardcoded timeout of 100ms. Consider parameterizing this value and testing with different durations to ensure consistent behavior across various timing conditions.

 func TestBlockingBehavior(t *testing.T) {
+	tests := []struct {
+		name    string
+		timeout time.Duration
+	}{
+		{"short_timeout", 100 * time.Millisecond},
+		{"medium_timeout", 500 * time.Millisecond},
+		{"long_timeout", 1 * time.Second},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
 			b := New[int](2, false)
 			// ... rest of the test ...
-			case <-time.After(100 * time.Millisecond):
+			case <-time.After(tt.timeout):
 				// Expected behavior - operation blocked
+		})
+	}
 }
go/pkg/zen/session.go (1)

99-102: Add documentation for Send method.

The Send method lacks documentation explaining its purpose and usage.

+// Send writes the response with the given status code and raw body bytes.
+// Unlike JSON, this method doesn't perform any content-type conversion.
 func (s *Session) Send(status int, body []byte) error {
 	return s.send(status, body)
 }
go/pkg/buffer/buffer_test.go (1)

82-82: Consider increasing timeout for slower systems.

The 100ms timeout might be too short for systems under load or slower CI environments. Consider increasing it or making it configurable.

-			timeout := time.After(100 * time.Millisecond)
+			timeout := time.After(500 * time.Millisecond)

Also applies to: 83-83

apps/agent/pkg/config/agent.go (1)

9-9: Add description and documentation for the Color field.

The Color field lacks a description tag and documentation about its purpose. Consider adding:

  1. A description tag like other fields
  2. Documentation about when to enable/disable color output
-		Color bool `json:"color,omitempty"`
+		Color bool `json:"color,omitempty" description:"Enable colored output in logs"`
go/pkg/membership/membership_test.go (1)

116-116: Consider reducing test timeouts.

The 30-second timeout in require.Eventually seems excessive for these operations. Consider reducing it to improve test execution time.

-				}, 30*time.Second, 100*time.Millisecond)
+				}, 5*time.Second, 100*time.Millisecond)

Also applies to: 154-154

apps/agent/config.apprunner.production.json (1)

10-16: Logging Configuration Check
The logging block is clearly defined with a “color” flag and nested Axiom settings. Verify that the “color” value (set to false) meets your deployment or debugging requirements.

go/go.mod (2)

6-32: Direct Dependencies Update: Validate Compatibility

The explicit dependencies have been updated with new additions and version bumps. For example:

  • Added new dependencies like connectrpc.com/connect v1.16.2 and connectrpc.com/otelconnect v0.7.1.
  • Updated versions for key packages such as github.com/axiomhq/axiom-go v0.21.1 and go.opentelemetry.io/otel v1.34.0/otel/trace v1.34.0.
  • Several dependencies (e.g., github.com/danielgtaylor/huma, github.com/hashicorp/memberlist, github.com/lmittmann/tint, and github.com/maypok86/otter) are now at newer versions.

Please verify that these changes are compatible with the rest of the codebase (especially regarding any breaking API changes) and that appropriate testing is in place after these modifications.


36-175: Indirect Dependencies Overhaul: Extensive Updates and Upgrades

A large number of indirect dependencies have been updated. Notable updates include:

  • Upgrades in telemetry-related packages (e.g., various go.opentelemetry.io/otel/* modules, go.opentelemetry.io/proto/otlp).
  • Updates for numerous utility and performance libraries (such as packages from github.com/gonum/*, github.com/hashicorp/*, and others).
  • Changes to libraries in tooling and formatting (e.g., golang.org/x/* packages).

Given the extensive nature of these updates, please ensure that:

  • A full dependency audit is performed.
  • Regression tests pass to catch any subtle behavioral differences or incompatibilities.
  • There is an awareness of any minor API changes or deprecations introduced by these new versions.
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 67213d2 and f5f0f30.

⛔ Files ignored due to path filters (4)
  • go/gen/proto/ratelimit/v1/ratelimitv1connect/service.connect.go is excluded by !**/gen/**
  • go/gen/proto/ratelimit/v1/service.pb.go is excluded by !**/*.pb.go, !**/gen/**
  • go/go.sum is excluded by !**/*.sum
  • go/pkg/database/gen/verify_key.sql.go is excluded by !**/gen/**
📒 Files selected for processing (61)
  • Taskfile.yml (1 hunks)
  • apps/agent/Dockerfile (2 hunks)
  • apps/agent/cmd/agent/setup.go (1 hunks)
  • apps/agent/config.apprunner.production.json (1 hunks)
  • apps/agent/config.apprunner.staging.json (1 hunks)
  • apps/agent/pkg/api/validation/validator.go (0 hunks)
  • apps/agent/pkg/config/agent.go (1 hunks)
  • apps/agent/pkg/logging/logger.go (2 hunks)
  • apps/agent/services/vault/storage/s3.go (0 hunks)
  • deployment/docker-compose.yaml (1 hunks)
  • go/Dockerfile (1 hunks)
  • go/Taskfile.yml (2 hunks)
  • go/cmd/api/config.go (1 hunks)
  • go/cmd/api/main.go (5 hunks)
  • go/cmd/api/routes/register.go (1 hunks)
  • go/cmd/api/routes/services.go (2 hunks)
  • go/cmd/api/routes/v2_ecs_meta/handler.go (1 hunks)
  • go/config.docker.json (1 hunks)
  • go/go.mod (5 hunks)
  • go/internal/services/keys/verify.go (0 hunks)
  • go/internal/services/ratelimit/bucket.go (2 hunks)
  • go/internal/services/ratelimit/peer.go (0 hunks)
  • go/internal/services/ratelimit/peers.go (1 hunks)
  • go/internal/services/ratelimit/replay.go (2 hunks)
  • go/internal/services/ratelimit/sliding_window.go (4 hunks)
  • go/main.go (1 hunks)
  • go/pkg/assert/assert.go (1 hunks)
  • go/pkg/buffer/buffer.go (1 hunks)
  • go/pkg/buffer/buffer_test.go (1 hunks)
  • go/pkg/cache/middleware/metrics.go (0 hunks)
  • go/pkg/certificate/amazon_certificate_manager.go (0 hunks)
  • go/pkg/certificate/autocert.go (0 hunks)
  • go/pkg/certificate/interface.go (0 hunks)
  • go/pkg/circuitbreaker/lib.go (4 hunks)
  • go/pkg/circuitbreaker/lib_test.go (1 hunks)
  • go/pkg/circuitbreaker/metrics.go (0 hunks)
  • go/pkg/clickhouse/schema/requests.go (1 hunks)
  • go/pkg/cluster/cluster.go (1 hunks)
  • go/pkg/cluster/interface.go (1 hunks)
  • go/pkg/cluster/noop.go (1 hunks)
  • go/pkg/database/database.go (1 hunks)
  • go/pkg/discovery/interface.go (1 hunks)
  • go/pkg/discovery/static.go (1 hunks)
  • go/pkg/logging/slog.go (1 hunks)
  • go/pkg/membership/bus.go (1 hunks)
  • go/pkg/membership/fake.go (0 hunks)
  • go/pkg/membership/interface.go (1 hunks)
  • go/pkg/membership/member.go (1 hunks)
  • go/pkg/membership/memberlist.go (1 hunks)
  • go/pkg/membership/membership_test.go (1 hunks)
  • go/pkg/membership/redis.go (0 hunks)
  • go/pkg/membership/tags.go (0 hunks)
  • go/pkg/ring/ring.go (1 hunks)
  • go/pkg/ring/ring_test.go (1 hunks)
  • go/pkg/testutil/http.go (2 hunks)
  • go/pkg/zen/middleware_metrics.go (1 hunks)
  • go/pkg/zen/middleware_validate.go (1 hunks)
  • go/pkg/zen/server.go (4 hunks)
  • go/pkg/zen/session.go (3 hunks)
  • go/proto/ratelimit/v1/service.proto (3 hunks)
  • go/schema.json (2 hunks)
💤 Files with no reviewable changes (12)
  • go/pkg/certificate/amazon_certificate_manager.go
  • go/pkg/certificate/interface.go
  • apps/agent/services/vault/storage/s3.go
  • go/pkg/circuitbreaker/metrics.go
  • apps/agent/pkg/api/validation/validator.go
  • go/pkg/certificate/autocert.go
  • go/pkg/membership/tags.go
  • go/internal/services/ratelimit/peer.go
  • go/internal/services/keys/verify.go
  • go/pkg/membership/redis.go
  • go/pkg/membership/fake.go
  • go/pkg/cache/middleware/metrics.go
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: Test Agent Local / test_agent_local
  • GitHub Check: Test API / API Test Local
  • GitHub Check: Build / Build
🔇 Additional comments (76)
go/pkg/membership/interface.go (2)

3-3: New import looks fine.
This import is required for the discovery.Discoverer reference in the new Start method.


6-8: Consider context handling in membership methods.
Removing the context arguments may limit cancellation or timeouts for membership operations. Ensure this aligns with your design, especially for potentially long-running or blocking calls.

go/cmd/api/config.go (2)

4-6: Confirm defaulting behavior for HttpPort.
Using a non-pointer field with a JSON default is valid, but confirm whether a zero port (e.g., 0) might be mistakenly used if config merges fail. Otherwise, this addition looks straightforward.


30-32: Color-logging enhancement.
The boolean toggle for color logging is clear and simple. No immediate concerns.

go/internal/services/ratelimit/replay.go (5)

5-5: Confirm stable import version of log/slog.
Ensure that you are pinning or vendoring a stable release to avoid unintentional library updates.


17-21: Check buffer consumption logic.
Reading from s.replayBuffer.Consume() presumably never ends until the buffer is closed. Confirm that replay buffer closure or draining scenarios are handled.


32-38: Validate key generation logic.
Building a single key from multiple fields is fine. Ensure collisions aren’t possible when different limit/duration combos hash to the same string.


43-44: Confirm node ID stability.
Comparing p.node.ID with s.cluster.Self().ID is valid if node IDs never change at runtime. Just be sure node identity is immutable or re-initialized consistently.


59-76: Double-check concurrency in SetWindows.
Two consecutive calls to set window data could be subject to race conditions if SetWindows is not concurrency-safe. Confirm the underlying data store is protected.

go/pkg/cluster/cluster.go (1)

63-65: Accessor method is fine, but consider concurrency.

Returning c.self without locking is currently safe if c.self is never modified. If future enhancements update c.self, introduce appropriate read locks.

go/pkg/membership/memberlist.go (1)

135-146: Check concurrency approach in Members().

memberlist.Members() is read without locking. Typically, Memberlist is concurrency safe, but confirm or wrap with mu.RLock() for consistency.

go/pkg/ring/ring.go (2)

64-91: Check re-initialization race conditions.

AddNode ensures no duplicate IDs but doesn’t defend against concurrent calls to add the same node or overlapping node IDs. If AddNode can be called from multiple goroutines, document the concurrency model or unify calls behind a single agent.


119-130: No concurrency issue, but consider an RWLock for Members().

Members uses mu.RLock(), which is consistent. Just ensure no further writes occur that might conflict. This is good for read concurrency.

go/cmd/api/main.go (13)

7-12: No issues with new imports
These additional imports (net, time) appear intentional and are used within the cluster and context timeout operations.


14-20: No issues with new references
Adding references to routes, keys, cluster, database, and discovery aligns with the new dependencies and functionalities introduced in this file.


31-32: Helpful CLI description
Providing a “Description” field for the CLI command clarifies the purpose of the command.


35-41: Making config flag required
Making the config file required and referencing UNKEY_CONFIG_FILE environment variable ensures a strict and consistent configuration loading process.


49-50: Correct usage of cli context
Extracting ctx and configFile from cliC is straightforward and consistent, no issues.


59-63: Node ID fallback
This logic cleanly defaults to a generated node ID unless overridden by cluster config. Verify that this fallback behavior is what you expect in all environments.


67-73: Sensible logger setup
Creating a development logger with NoColor helps with debugging. Ensure that production or staging environments override these settings as needed.


77-78: Centralized panic handling
Capturing panics and logging them is good for debuggability.


84-84: Configuration load logging
This info log helps confirm which config file is loaded at runtime.


86-87: No-op cluster fallback
Providing a no-op cluster is a clean fallback when no cluster configuration is present.


88-100: Static discovery usage
The logic to switch between static and AWS Cloudmap discovery is sensible for a multi-environment approach. Consider validating the static addresses (e.g., format, duplicates) if they come from user-defined configuration.


102-131: Graceful membership and cluster init
Initializing membership and then injecting it into the cluster is logically sound. However, if membership.New succeeds but membership.Start fails, you might end up with partial initialization. Ensure that higher-level rollback or cleanup logic is in place if that occurs frequently.


193-197: One-minute shutdown window
Using a one-minute timeout for shutdown is typically safe. No immediate issues, but confirm that this is sufficient for all cleanup tasks (DB flushes, membership leaves, etc.).

go/internal/services/ratelimit/sliding_window.go (6)

32-36: Potential memory considerations for replay buffer
Retaining up to 10k requests in replayBuffer could be significant depending on traffic. Ensure that stale entries are processed or that capacity is sufficient for your expected load without risking memory bloat.


38-42: Config struct clarity
The new Config struct is straightforward, and requiring a logger, cluster, and optional clock clarifies dependencies. No issues here.


73-95: Ratelimit method: Replaying requests
Buffering the requests for replay is clever. Ensure that Identifier or any other fields do not contain sensitive PII that might be stored inadvertently.


97-152: Sliding window calculations
Your logic for calculating partial usage from the previous window and the current window is standard for a sliding window approach. Lock usage around the bucket windows is correct to avoid race conditions with concurrent updates.


153-162: New type for setWindowRequest
Encapsulating these details is clear, especially for out-of-band synchronization of counters. No issues here.


164-186: SetWindows approach
Locking the bucket and only replaying if the new counter is higher prevents regressions from out-of-order messages. This is a robust approach to distributed rate-limiting updates.

go/cmd/api/routes/services.go (2)

8-8: LGTM!

The import statement for the validation package is correctly added.


20-20: Verify validator initialization.

The Validator field is correctly added to the Services struct. However, ensure that this field is properly initialized when creating Services instances.

❌ Verification inconclusive

🏁 Script executed:

#!/bin/bash
# Search for Services struct initialization to verify Validator field initialization
rg -A 5 "Services\s*{" --type go

Length of output: 122


I'm seeing that the initial regex caused an error because the { wasn’t properly escaped. To verify the proper initialization of the Validator field in the Services struct, please re-run the verification using the following corrected shell script:

This script should help you inspect the struct definition (and any inline initialization) to ensure that the Validator field is being properly set. Once you review the output, please verify manually that all instances of Services are correctly initializing the Validator field.


🏁 Script executed:

#!/bin/bash
# Corrected regex search for the Services struct definition (with a properly escaped '{')
rg -A 10 'type Services struct\s*\{' --type go

Length of output: 2697


Action Required: Verify Services.Validator initialization in constructor or DI wiring

The search confirms that the Services struct in go/cmd/api/routes/services.go includes the Validator field, but we couldn’t locate any constructor function or explicit assignment that initializes it. Please manually verify that the Validator field is correctly instantiated—either via a dedicated constructor, dependency injection, or another initialization mechanism—when a Services instance is created.

go/pkg/clickhouse/schema/requests.go (1)

4-4: LGTM! The addition of WorkspaceID enhances request context.

The new field follows the established naming and tagging patterns, providing valuable workspace context for API requests.

go/pkg/membership/bus.go (1)

10-14: LGTM! Clean event-driven design for membership management.

The bus implementation with separate topics for join, leave, and update events provides clear separation of concerns.

apps/agent/pkg/logging/logger.go (1)

20-20: LGTM! Clean implementation of color output configuration.

The color support is well-integrated with safe defaults and proper configuration handling.

Also applies to: 36-36

go/cmd/api/routes/register.go (1)

13-25: LGTM! Well-organized middleware configuration.

The middleware initialization and organization is clean, with clear documentation about the order of execution.

apps/agent/cmd/agent/setup.go (1)

15-17: LGTM! Color configuration added to logger initialization.

The addition of color configuration through the structured config is clean and type-safe.

go/pkg/logging/slog.go (1)

27-27: Verify impact of disabling source information in development logs.

Setting AddSource: false removes file and line information from development logs when colors are enabled. This might impact debugging capabilities.

Consider making this configurable or documenting the rationale:

-           AddSource:   false,
+           AddSource:   cfg.Development, // Enable source info in development mode
go/pkg/zen/middleware_metrics.go (1)

50-51: LGTM! Enhanced request metrics with workspace context.

The addition of WorkspaceID to request metrics provides valuable context while maintaining existing functionality.

go/pkg/testutil/http.go (1)

28-28: LGTM! Consistent ID field naming.

The rename from NodeId to NodeID follows Go naming conventions for acronyms.

go/pkg/database/database.go (1)

6-6: LGTM! MySQL driver import added correctly.

The MySQL driver is properly imported with a blank identifier for side effects.

go/internal/services/ratelimit/peers.go (1)

17-20: LGTM! Well-structured peer type.

The peer struct effectively encapsulates both the cluster node and its corresponding RPC client.

go/pkg/circuitbreaker/lib_test.go (1)

18-18: LGTM! Simplified test setup.

Removing the logger from the test setup is appropriate as it's not essential for testing the circuit breaker's state transitions.

go/internal/services/ratelimit/bucket.go (2)

22-22: LGTM! Improved mutex field naming.

Renaming the mutex field to mu follows Go conventions and improves code readability.


42-44: LGTM! Method name now accurately describes its behavior.

Renaming getBucket to getOrCreateBucket better reflects the method's functionality.

go/pkg/zen/server.go (2)

122-122: LGTM! Improved middleware management.

The change to pass middlewares directly to routes instead of storing them globally improves clarity and maintainability.


27-27: LGTM! Field rename follows Go conventions.

The rename from NodeId to NodeID follows Go naming conventions for acronyms.

go/pkg/circuitbreaker/lib.go (1)

124-124: LGTM! Good improvements to logging and initialization.

The changes improve the code in several ways:

  1. Using NewNoop logger reduces overhead
  2. Explicit initialization of counters improves clarity
  3. Structured logging with slog improves log readability

Also applies to: 138-142, 195-199

go/config.docker.json (2)

3-4: New Network Address Configuration:
The introduction of "httpAddr": "0.0.0.0:7070" and "rpcAddr": ":7071" standardizes how the service exposes its HTTP and RPC endpoints. This enhances flexibility and aligns with dynamic deployment practices.


7-7: Dynamic Database Configuration:
Updating the "primary" field to reference "${DATABASE_PRIMARY_DSN}" allows the database connection string to be dynamically set through the environment. Please ensure that the DATABASE_PRIMARY_DSN variable is correctly defined in every deployment environment.

go/Dockerfile (1)

18-19: Container Entrypoint Configuration:
Adding the ENTRYPOINT [ "/usr/local/bin/unkey" ] directive guarantees that the container automatically runs the intended executable on start-up. Verify that the binary exists at the specified location and that this configuration aligns with the overall service startup logic.

go/Taskfile.yml (3)

1-2: Taskfile Version Update:
Switching to a double-quoted version string (now "3") improves consistency with YAML best practices.


18-21: New Generate Task:
The new generate task runs both go generate ./... and buf generate, thereby streamlining the process for code and protobuf generation. Ensure that both the Go toolchain and buf are available in your build environment, and that their outputs match expected artifacts.


26-29: New Simulate Task:
The simulate task introduces an automated simulation test using gosim. Please confirm that the command (go run github.com/jellevandenhooff/gosim/cmd/gosim test -v -run TestGosim) is compatible with your current testing framework and that it delivers the desired test outputs.

apps/agent/config.apprunner.staging.json (1)

1-33: New App Runner Staging Configuration:
This new configuration file for AWS App Runner provides dynamic settings for staging deployments. Key elements like dynamic Docker image references, port settings, and comprehensive logging configuration (with "color": false) help promote consistency across environments. Additionally, note that the "nodeId" is left empty—ensure this is intentional or consider providing a default if required by downstream components.

apps/agent/Dockerfile (2)

19-20: Inclusion of New App Runner Configurations:
The addition of the copy commands for config.apprunner.production.json and config.apprunner.staging.json guarantees that these dynamic deployment configurations are available in the final Docker image. Double-check that these file paths and names are correct and reflect the current repository structure.


1-24: General Dockerfile Review:
The Dockerfile now uses golang:1.22 (instead of an Alpine variant) for both the builder and runtime stages, which may affect the available system libraries and the runtime environment. Ensure this change is compatible with your service’s needs. The overall build process remains standard, and the explicit inclusion of multiple configuration files supports enhanced deployment flexibility.

apps/agent/config.apprunner.production.json (6)

1-9: Initial Configuration Setup
The basic structure (schema, platform, image, nodeId, port, rpcPort, region, and authToken) is defined correctly using environment variables for flexibility.


17-22: Tracing Configuration Review
The tracing block mirrors the logging setup with Axiom details. Ensure that the token provided via the environment remains secure.


23-28: Metrics Configuration
Metrics settings are consistently structured in line with logging and tracing. No issues detected.


29-37: Vault Service Settings
The “services” section for vault includes all necessary S3 parameters. Confirm that these sensitive values (bucket, access keys, and master keys) are populated securely at runtime.


38-47: Cluster Configuration Inspection
The cluster configuration is comprehensive—with fields for authToken, serfAddr, rpcAddr, and a nested join setting. Double-check that an empty “addrs” array is intentional and that the placement of “nodeId” (inside the cluster block) aligns with your design expectations, especially given earlier discussions about its removal at the top level.


48-51: Heartbeat Settings
The heartbeat block is straightforward with an interval and URL. If later scaling or retries become necessary, consider whether additional parameters might be needed.

Taskfile.yml (1)

57-63: New Task: nuke-docker
The addition of the "nuke-docker" task provides a convenient mechanism to stop all running Docker containers and clean up unused artifacts. The use of || true ensures the command does not fail if no containers are running.

go/schema.json (5)

19-86: New Cluster Configuration in Schema
The introduction of the "cluster" property is well executed. It supports both “awsCloudmap” and “static” discovery modes and details sub-properties like advertiseAddr, gossipPort, nodeId, and rpcPort. Note that the inclusion of “nodeId” within the cluster object should be verified against your design goals (it appears the high-level summary mentioned its removal elsewhere).


118-123: Addition of httpPort Property
The new "httpPort" property is defined with a default value of 7070. This should properly replace or complement any legacy “port” configurations, ensuring the system listens as intended.


128-137: Logs Configuration Update
Adding a new "logs" object (with a “color” boolean) is a useful enhancement for customizable logging output. Consider extending this object if additional logging options become necessary in the future.


143-145: New redisUrl Property
The "redisUrl" property is now available for configuring Redis connections. Verify that its value is managed securely within your deployment pipeline.


152-153: Updated Required Properties
The list of required properties now includes "httpPort", "redisUrl", and "database". Ensure that all configuration files (both production and staging) provide these values.

deployment/docker-compose.yaml (1)

34-44: Updated Service Configuration for apiv2
The apiv2 service now explicitly maps port "7070:7070". This change likely accommodates the removal of the separate API load balancer (api_lb). Confirm that this new port mapping meets your routing and scaling requirements.

go/proto/ratelimit/v1/service.proto (4)

5-5: Updated go_package Option
The go_package option now reflects the project's revised structure. Ensure that all generated code and imports are updated to this new path across your services.


35-35: RatelimitRequest "cost" Field Update
The "cost" field within the RatelimitRequest message is now defined without an "optional" keyword, making it effectively required in proto3. Make sure that all clients include this field in their requests to prevent deserialization issues.


106-110: New "time" Field in ReplayRequest
Introducing the "time" field to capture when the edge node receives the request is a solid improvement for accurate sequence calculations. Verify that edge nodes pass a valid Unix timestamp.


112-117: ReplayRequest "denied" Field Reordering
The repositioning of the "denied" field to field number 3 is crucial for schema consistency. Ensure that both the clients and servers are updated to accommodate this shifted field numbering.

Comment on lines +78 to +82
if err != nil {
tracing.RecordError(span, err)

// req.events is guaranteed to have at least element
// and the first one should be the oldest event, so we can use it to get the max latency
latency := time.Since(t)
labels := map[string]string{
"nodeId": s.cluster.NodeId(),
"peerId": peer.Id,
s.logger.Error(ctx, "unable to set windows", slog.String("error", err.Error()), slog.String("key", key))
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Error handling and partial updates.
If the first window sets successfully but the second fails, you may leave data in a partially updated state. Consider more granular rollback or error handling if this scenario matters.

Comment on lines +75 to +113
// listens to membership changes and updates the hash ring
func (c *cluster) keepInSync() {
joins := c.membership.SubscribeJoinEvents()
leaves := c.membership.SubscribeLeaveEvents()

for {
select {
case node := <-joins:
{
ctx := context.Background()
c.logger.Info(ctx, "node joined", slog.String("nodeID", node.NodeID))

err := c.ring.AddNode(ctx, ring.Node[Node]{
ID: node.NodeID,
Tags: Node{
RpcAddr: fmt.Sprintf("%s:%d", node.Addr.String(), c.rpcPort),
ID: node.NodeID,
Addr: node.Addr,
},
})
if err != nil {
c.logger.Error(ctx, "failed to add node to ring", slog.String("error", err.Error()))
}

}
case node := <-leaves:
{
ctx := context.Background()
c.logger.Info(ctx, "node left", slog.String("nodeID", node.NodeID))
err := c.ring.RemoveNode(ctx, node.NodeID)
if err != nil {
c.logger.Error(ctx, "failed to remove node from ring", slog.String("error", err.Error()))
}
}
}

}

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle event propagation or track join/leave events.

Within keepInSync, membership events are directly added/removed from the ring, but joinEvents and leaveEvents topics are initialized and never used. If the intent is to publish node updates, consider publishing to these topics or removing them for clarity. Additionally, the infinite loop lacks a break condition or context check for shutdown.

go/pkg/cluster/cluster.go Show resolved Hide resolved
Comment on lines +124 to +126
func (c *cluster) Shutdown(ctx context.Context) error {
return c.membership.Leave()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Stop the cluster goroutine on shutdown.

Shutdown calls membership.Leave() but doesn’t signal keepInSync to exit. If the keepInSync goroutine attempts to read membership events after membership is closed, it could block indefinitely.

 func (c *cluster) Shutdown(ctx context.Context) error {
     err := c.membership.Leave()
+    // Consider signaling a done channel or context to terminate keepInSync
     return err
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *cluster) Shutdown(ctx context.Context) error {
return c.membership.Leave()
}
func (c *cluster) Shutdown(ctx context.Context) error {
err := c.membership.Leave()
// Consider signaling a done channel or context to terminate keepInSync
return err
}

go/pkg/membership/memberlist.go Outdated Show resolved Hide resolved
Comment on lines 57 to 58
fmt.Println("Buffering", t, "len", len(b.c), "cap", cap(b.c))
if b.drop && len(b.c) >= b.capacity {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove debug print statement from production code.

The fmt.Println statement should be removed or replaced with proper structured logging using a logger interface.

 func (b *Buffer[T]) Buffer(t T) {
-	fmt.Println("Buffering", t, "len", len(b.c), "cap", cap(b.c))
+	// Consider adding structured logging if needed:
+	// log.Debug().
+	//   Interface("value", t).
+	//   Int("length", len(b.c)).
+	//   Int("capacity", cap(b.c)).
+	//   Msg("Buffering value")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fmt.Println("Buffering", t, "len", len(b.c), "cap", cap(b.c))
if b.drop && len(b.c) >= b.capacity {
func (b *Buffer[T]) Buffer(t T) {
- fmt.Println("Buffering", t, "len", len(b.c), "cap", cap(b.c))
+ // Consider adding structured logging if needed:
+ // log.Debug().
+ // Interface("value", t).
+ // Int("length", len(b.c)).
+ // Int("capacity", cap(b.c)).
+ // Msg("Buffering value")
if b.drop && len(b.c) >= b.capacity {
// ... (rest of the function)
}
}

Comment on lines +9 to +14
type Buffer[T any] struct {
c chan T // The underlying channel storing elements

capacity int // Maximum number of elements the buffer can hold
drop bool // Whether to drop new elements when buffer is full
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding mutex for thread-safe operations.

The buffer operations on the channel might need synchronization if accessed concurrently from multiple goroutines.

 type Buffer[T any] struct {
+	mu sync.RWMutex // Protects the following fields
 	c chan T // The underlying channel storing elements
 
 	capacity int  // Maximum number of elements the buffer can hold
 	drop     bool // Whether to drop new elements when buffer is full
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type Buffer[T any] struct {
c chan T // The underlying channel storing elements
capacity int // Maximum number of elements the buffer can hold
drop bool // Whether to drop new elements when buffer is full
}
type Buffer[T any] struct {
mu sync.RWMutex // Protects the following fields
c chan T // The underlying channel storing elements
capacity int // Maximum number of elements the buffer can hold
drop bool // Whether to drop new elements when buffer is full
}

go/pkg/buffer/buffer.go Outdated Show resolved Hide resolved
go/pkg/buffer/buffer_test.go Show resolved Hide resolved
go/pkg/membership/membership_test.go Show resolved Hide resolved
Comment on lines +50 to +61
s := &service{
clock: config.Clock,
logger: config.Logger,
cluster: config.Cluster,
shutdownCh: make(chan struct{}),
bucketsLock: sync.RWMutex{},
buckets: make(map[string]*bucket),
peerMu: sync.RWMutex{},
peers: make(map[string]peer),
replayBuffer: buffer.New[*ratelimitv1.ReplayRequest](10_000, true),
replayCircuitBreaker: circuitbreaker.New[*connect.Response[ratelimitv1.ReplayResponse]]("replayRatelimitRequest"),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Why one of the locks called "xxxxLock" and "xxMu". I know mu is the convention anyways just a nit. Also why don't we have a lock per bucket probably would a bit faster or lock per peer? We could just have another map of peers and locks or buckets and locks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on naming

we have a lock per bucket
why would we need a lock per peer?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants