Skip to content

Commit

Permalink
Add non-cached agent readiness endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton-Kalpakchiev committed Nov 15, 2024
1 parent c34d600 commit 7215461
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 10 deletions.
45 changes: 43 additions & 2 deletions agent/agentserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -22,13 +22,15 @@ import (
_ "net/http/pprof" // Registers /debug/pprof endpoints in http.DefaultServeMux.
"os"
"strings"
"sync"

"github.com/uber/kraken/build-index/tagclient"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/containerruntime"
"github.com/uber/kraken/lib/middleware"
"github.com/uber/kraken/lib/store"
"github.com/uber/kraken/lib/torrent/scheduler"
"github.com/uber/kraken/tracker/announceclient"
"github.com/uber/kraken/utils/handler"
"github.com/uber/kraken/utils/httputil"

Expand All @@ -46,6 +48,7 @@ type Server struct {
cads *store.CADownloadStore
sched scheduler.ReloadableScheduler
tags tagclient.Client
ac announceclient.Client
containerRuntime containerruntime.Factory
}

Expand All @@ -56,13 +59,22 @@ func New(
cads *store.CADownloadStore,
sched scheduler.ReloadableScheduler,
tags tagclient.Client,
ac announceclient.Client,
containerRuntime containerruntime.Factory) *Server {

stats = stats.Tagged(map[string]string{
"module": "agentserver",
})

return &Server{config, stats, cads, sched, tags, containerRuntime}
return &Server{
config: config,
stats: stats,
cads: cads,
sched: sched,
tags: tags,
ac: ac,
containerRuntime: containerRuntime,
}
}

// Handler returns the HTTP handler.
Expand All @@ -73,6 +85,7 @@ func (s *Server) Handler() http.Handler {
r.Use(middleware.LatencyTimer(s.stats))

r.Get("/health", handler.Wrap(s.healthHandler))
r.Get("/readiness", handler.Wrap(s.readinessCheckHandler))

r.Get("/tags/{tag}", handler.Wrap(s.getTagHandler))

Expand Down Expand Up @@ -194,6 +207,34 @@ func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) error {
return nil
}

func (s *Server) readinessCheckHandler(w http.ResponseWriter, r *http.Request) error {
var schedErr, buildIndexErr, trackerErr error
var wg sync.WaitGroup

wg.Add(3)
go func() {
schedErr = s.sched.Probe()
wg.Done()
}()
go func() {
buildIndexErr = s.tags.CheckReadiness()
wg.Done()
}()
go func() {
trackerErr = s.ac.CheckReadiness()
wg.Done()
}()

wg.Wait()
if schedErr != nil || buildIndexErr != nil || trackerErr != nil {
return handler.Errorf("agent not ready, scheduler error: %v\n"+
"build index error: %v\n"+
"tracker error: %v", schedErr, buildIndexErr, trackerErr).Status(http.StatusServiceUnavailable)
}
io.WriteString(w, "OK")
return nil
}

// patchSchedulerConfigHandler restarts the agent torrent scheduler with
// the config in request body.
func (s *Server) patchSchedulerConfigHandler(w http.ResponseWriter, r *http.Request) error {
Expand Down
69 changes: 66 additions & 3 deletions agent/agentserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -21,6 +21,7 @@ import (
"fmt"
"io/ioutil"
"net/url"
"regexp"
"testing"
"time"

Expand All @@ -35,6 +36,7 @@ import (
mockcontainerd "github.com/uber/kraken/mocks/lib/containerruntime/containerd"
mockdockerdaemon "github.com/uber/kraken/mocks/lib/containerruntime/dockerdaemon"
mockscheduler "github.com/uber/kraken/mocks/lib/torrent/scheduler"
mockannounceclient "github.com/uber/kraken/mocks/tracker/announceclient"
"github.com/uber/kraken/utils/httputil"
"github.com/uber/kraken/utils/testutil"

Expand All @@ -49,6 +51,7 @@ type serverMocks struct {
tags *mocktagclient.MockClient
dockerCli *mockdockerdaemon.MockDockerClient
containerdCli *mockcontainerd.MockClient
ac *mockannounceclient.MockClient
containerRuntime *mockcontainerruntime.MockFactory
cleanup *testutil.Cleanup
}
Expand All @@ -68,14 +71,15 @@ func newServerMocks(t *testing.T) (*serverMocks, func()) {

dockerCli := mockdockerdaemon.NewMockDockerClient(ctrl)
containerdCli := mockcontainerd.NewMockClient(ctrl)
ac := mockannounceclient.NewMockClient(ctrl)
containerruntime := mockcontainerruntime.NewMockFactory(ctrl)
return &serverMocks{
cads, sched, tags, dockerCli, containerdCli,
cads, sched, tags, dockerCli, containerdCli, ac,
containerruntime, &cleanup}, cleanup.Run
}

func (m *serverMocks) startServer() string {
s := New(Config{}, tally.NoopScope, m.cads, m.sched, m.tags, m.containerRuntime)
s := New(Config{}, tally.NoopScope, m.cads, m.sched, m.tags, m.ac, m.containerRuntime)
addr, stop := testutil.StartServer(s.Handler())
m.cleanup.Add(stop)
return addr
Expand Down Expand Up @@ -207,6 +211,65 @@ func TestHealthHandler(t *testing.T) {
}
}

func TestReadinessCheckHandler(t *testing.T) {
for _, tc := range []struct {
desc string
probeErr error
buildIndexErr error
trackerErr error
expectedErrMsgPattern string
}{
{
desc: "success",
probeErr: nil,
buildIndexErr: nil,
trackerErr: nil,
expectedErrMsgPattern: "",
},
{
desc: "failure (probe fails)",
probeErr: errors.New("test scheduler error"),
buildIndexErr: nil,
trackerErr: nil,
expectedErrMsgPattern: `GET http://127\.0\.0\.1:\d+/readiness 503: agent not ready, scheduler error: test scheduler error\nbuild index error: <nil>\ntracker error: <nil>`,
},
{
desc: "failure (build index not ready)",
probeErr: nil,
buildIndexErr: errors.New("build index not ready"),
trackerErr: nil,
expectedErrMsgPattern: `GET http://127\.0\.0\.1:\d+/readiness 503: agent not ready, scheduler error: <nil>\nbuild index error: build index not ready\ntracker error: <nil>`,
},
{
desc: "failure (tracker not ready)",
probeErr: nil,
buildIndexErr: nil,
trackerErr: errors.New("tracker not ready"),
expectedErrMsgPattern: `GET http://127\.0\.0\.1:\d+/readiness 503: agent not ready, scheduler error: <nil>\nbuild index error: <nil>\ntracker error: tracker not ready`,
},
} {
t.Run(tc.desc, func(t *testing.T) {
require := require.New(t)

mocks, cleanup := newServerMocks(t)
defer cleanup()

mocks.sched.EXPECT().Probe().Return(tc.probeErr)
mocks.tags.EXPECT().CheckReadiness().Return(tc.buildIndexErr)
mocks.ac.EXPECT().CheckReadiness().Return(tc.trackerErr)

addr := mocks.startServer()
_, err := httputil.Get(fmt.Sprintf("http://%s/readiness", addr))
if tc.expectedErrMsgPattern == "" {
require.Nil(err)
} else {
r, _ := regexp.Compile(tc.expectedErrMsgPattern)
require.True(r.MatchString(err.Error()))
}
})
}
}

func TestPatchSchedulerConfigHandler(t *testing.T) {
require := require.New(t)

Expand Down
8 changes: 5 additions & 3 deletions agent/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -30,6 +30,7 @@ import (
"github.com/uber/kraken/lib/torrent/scheduler"
"github.com/uber/kraken/metrics"
"github.com/uber/kraken/nginx"
"github.com/uber/kraken/tracker/announceclient"
"github.com/uber/kraken/utils/configutil"
"github.com/uber/kraken/utils/log"
"github.com/uber/kraken/utils/netutil"
Expand Down Expand Up @@ -183,8 +184,9 @@ func Run(flags *Flags, opts ...Option) {
log.Fatalf("Error building client tls config: %s", err)
}

announceClient := announceclient.New(pctx, trackers, tls)
sched, err := scheduler.NewAgentScheduler(
config.Scheduler, stats, pctx, cads, netevents, trackers, tls)
config.Scheduler, stats, pctx, cads, netevents, trackers, announceClient, tls)
if err != nil {
log.Fatalf("Error creating scheduler: %s", err)
}
Expand Down Expand Up @@ -216,7 +218,7 @@ func Run(flags *Flags, opts ...Option) {
}

agentServer := agentserver.New(
config.AgentServer, stats, cads, sched, tagClient, containerRuntimeFactory)
config.AgentServer, stats, cads, sched, tagClient, announceClient, containerRuntimeFactory)
addr := fmt.Sprintf(":%d", flags.AgentServerPort)
log.Infof("Starting agent server on %s", addr)
go func() {
Expand Down
5 changes: 3 additions & 2 deletions lib/torrent/scheduler/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -39,14 +39,15 @@ func NewAgentScheduler(
cads *store.CADownloadStore,
netevents networkevent.Producer,
trackers hashring.PassiveRing,
announceClient announceclient.Client,
tls *tls.Config) (ReloadableScheduler, error) {

s, err := newScheduler(
config,
agentstorage.NewTorrentArchive(stats, cads, metainfoclient.New(trackers, tls)),
stats,
pctx,
announceclient.New(pctx, trackers, tls),
announceClient,
netevents)
if err != nil {
return nil, fmt.Errorf("new scheduler: %s", err)
Expand Down

0 comments on commit 7215461

Please sign in to comment.