Skip to content

Commit

Permalink
query: introduce maxJobs
Browse files Browse the repository at this point in the history
Workers will now accept multiple jobs up to the maxJobs.
  • Loading branch information
kcalvinalvin committed Oct 28, 2024
1 parent 197df14 commit b48dffa
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 13 deletions.
2 changes: 1 addition & 1 deletion query/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func startWorker() (*testCtx, error) {
subscriptions: make(chan chan wire.Message),
quit: make(chan struct{}),
}
results := make(chan *jobResult)
results := make(chan *jobResult, maxJobs)
quit := make(chan struct{})

wk := NewWorker(peer)
Expand Down
14 changes: 8 additions & 6 deletions query/workmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const (

// maxQueryTimeout is the maximum timeout given to a single query.
maxQueryTimeout = 32 * time.Second

// maxJobs is the maximum amount of jobs a single worker can have.
maxJobs = 10
)

var (
Expand Down Expand Up @@ -74,11 +77,10 @@ type PeerRanking interface {

// activeWorker wraps a Worker that is currently running, together with the job
// we have given to it.
// TODO(halseth): support more than one active job at a time.
type activeWorker struct {
w Worker
w Worker
activeJobs map[uint64]*queryJob
onExit chan struct{}
onExit chan struct{}
}

// Config holds the configuration options for a new WorkManager.
Expand Down Expand Up @@ -126,8 +128,8 @@ var _ WorkManager = (*peerWorkManager)(nil)
func NewWorkManager(cfg *Config) WorkManager {
return &peerWorkManager{
cfg: cfg,
newBatches: make(chan *batch),
jobResults: make(chan *jobResult),
newBatches: make(chan *batch, maxJobs),
jobResults: make(chan *jobResult, maxJobs),
quit: make(chan struct{}),
}
}
Expand Down Expand Up @@ -220,7 +222,7 @@ Loop:
for p, r := range workers {
// Only one active job at a time is currently
// supported.
if len(r.activeJobs) >= 1 {
if len(r.activeJobs) >= maxJobs {
continue
}

Expand Down
12 changes: 6 additions & 6 deletions query/workmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func startWorkManager(t *testing.T, numWorkers int) (WorkManager,
NewWorker: func(peer Peer) Worker {
m := &mockWorker{
peer: peer,
nextJob: make(chan *queryJob),
results: make(chan *jobResult),
nextJob: make(chan *queryJob, maxJobs),
results: make(chan *jobResult, maxJobs),
}
workerChan <- m
return m
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) {
for i := 0; i < numQueries; i++ {
q := &Request{}
queries[i] = q
scheduledJobs[i] = make(chan sched)
scheduledJobs[i] = make(chan sched, maxJobs)
}

// For each worker, spin up a goroutine that will forward the job it
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestWorkManagerCancelBatch(t *testing.T) {
// TestWorkManagerWorkRankingScheduling checks that the work manager schedules
// jobs among workers according to the peer ranking.
func TestWorkManagerWorkRankingScheduling(t *testing.T) {
const numQueries = 4
const numQueries = 40
const numWorkers = 8

workMgr, workers := startWorkManager(t, numWorkers)
Expand All @@ -414,7 +414,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
var jobs []*queryJob
for i := 0; i < numQueries; i++ {
select {
case job := <-workers[i].nextJob:
case job := <-workers[i/10].nextJob:
if job.index != uint64(i) {
t.Fatalf("unexpected job")
}
Expand Down Expand Up @@ -449,7 +449,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) {
// Go backwards, and succeed the queries.
for i := numQueries - 1; i >= 0; i-- {
select {
case workers[i].results <- &jobResult{
case workers[i/10].results <- &jobResult{
job: jobs[i],
err: nil,
}:
Expand Down

0 comments on commit b48dffa

Please sign in to comment.