Skip to content

Commit

Permalink
Add tracker readiness endpoint and client (#380)
Browse files Browse the repository at this point in the history
As described in #371, all services should implement a "readiness" endpoint, which checks whether the service is ready to serve traffic, i.e. whether it can reach all of its dependencies. The tracker makes requests to the origin cluster to get metainfo data for torrents. Therefore, its readiness endpoint should check the origin's readiness.

Additionally, the tracker's readiness endpoint must be queried by the agent, thus a client for the endpoint must be added.

- Add readiness endpoint for tracker, which checks origin's readiness.
- Add client for the endpoint
  • Loading branch information
Anton-Kalpakchiev authored Nov 15, 2024
1 parent 735046c commit c34d600
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 13 deletions.
87 changes: 77 additions & 10 deletions mocks/tracker/announceclient/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 19 additions & 1 deletion tracker/announceclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -23,6 +23,7 @@ import (
"time"

"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/backend"
"github.com/uber/kraken/lib/hashring"
"github.com/uber/kraken/utils/httputil"
)
Expand Down Expand Up @@ -58,6 +59,7 @@ type Response struct {

// Client defines a client for announcing and getting peers.
type Client interface {
CheckReadiness() error
Announce(
d core.Digest,
h core.InfoHash,
Expand Down Expand Up @@ -89,6 +91,18 @@ func getEndpoint(version int, addr string, h core.InfoHash) (method, url string)
return "POST", fmt.Sprintf("http://%s/announce/%s", addr, h.String())
}

func (c *client) CheckReadiness() error {
addr := c.ring.Locations(backend.ReadinessCheckDigest)[0]
_, err := httputil.Get(
fmt.Sprintf("http://%s/readiness", addr),
httputil.SendTimeout(5*time.Second),
httputil.SendTLS(c.tls))
if err != nil {
return fmt.Errorf("tracker not ready: %v", err)
}
return nil
}

// Announce announces the torrent identified by (d, h) with the number of
// downloaded bytes. Returns a list of all other peers announcing for said torrent,
// sorted by priority, and the interval for the next announce.
Expand Down Expand Up @@ -142,6 +156,10 @@ func Disabled() Client {
return DisabledClient{}
}

func (c DisabledClient) CheckReadiness() error {
return nil
}

// Announce always returns error.
func (c DisabledClient) Announce(
d core.Digest, h core.InfoHash, complete bool, version int) ([]*core.PeerInfo, time.Duration, error) {
Expand Down
45 changes: 44 additions & 1 deletion tracker/trackerserver/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -16,6 +16,7 @@ package trackerserver
import (
"errors"
"fmt"
"regexp"
"testing"
"time"

Expand All @@ -33,6 +34,48 @@ func newAnnounceClient(pctx core.PeerContext, addr string) announceclient.Client
return announceclient.New(pctx, hashring.NoopPassiveRing(hostlist.Fixture(addr)), nil)
}

func TestCheckReadiness(t *testing.T) {
for _, tc := range []struct {
name string
mockOriginErr error
expectedErrMsgPattern string
}{
{
name: "success",
mockOriginErr: nil,
expectedErrMsgPattern: "",
},
{
name: "failure, 503 (origin fails)",
mockOriginErr: errors.New("origin error"),
expectedErrMsgPattern: fmt.Sprintf(`tracker not ready: GET http://127\.0\.0\.1:\d+/readiness 503: not ready to serve traffic: origin error`),
},
} {
t.Run(tc.name, func(t *testing.T) {
require := require.New(t)

mocks, cleanup := newServerMocks(t, Config{})
defer cleanup()

addr, stop := testutil.StartServer(mocks.handler())
defer stop()

mocks.originCluster.EXPECT().CheckReadiness().Return(tc.mockOriginErr)

pctx := core.PeerContextFixture()
client := newAnnounceClient(pctx, addr)

err := client.CheckReadiness()
if tc.expectedErrMsgPattern == "" {
require.Nil(err)
} else {
r, _ := regexp.Compile(tc.expectedErrMsgPattern)
require.True(r.MatchString(err.Error()))
}
})
}
}

func TestAnnounceSinglePeerResponse(t *testing.T) {
for _, version := range []int{announceclient.V1, announceclient.V2} {
t.Run(fmt.Sprintf("V%d", version), func(t *testing.T) {
Expand Down
13 changes: 12 additions & 1 deletion tracker/trackerserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -77,6 +77,8 @@ func (s *Server) Handler() http.Handler {
r.Use(middleware.LatencyTimer(s.stats))

r.Get("/health", handler.Wrap(s.healthHandler))
r.Get("/readiness", handler.Wrap(s.readinessCheckHandler))

r.Get("/announce", handler.Wrap(s.announceHandlerV1))
r.Post("/announce/{infohash}", handler.Wrap(s.announceHandlerV2))
r.Get("/namespace/{namespace}/blobs/{digest}/metainfo", handler.Wrap(s.getMetaInfoHandler))
Expand All @@ -92,6 +94,15 @@ func (s *Server) ListenAndServe() error {
return listener.Serve(s.config.Listener, s.Handler())
}

func (s *Server) readinessCheckHandler(w http.ResponseWriter, r *http.Request) error {
err := s.originCluster.CheckReadiness()
if err != nil {
return handler.Errorf("not ready to serve traffic: %s", err).Status(http.StatusServiceUnavailable)
}
fmt.Fprintln(w, "OK")
return nil
}

func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) error {
fmt.Fprintln(w, "OK")
return nil
Expand Down

0 comments on commit c34d600

Please sign in to comment.