Skip to content

Commit

Permalink
fix: properly synchronize job initiation (#53)
Browse files Browse the repository at this point in the history
* fix: prevent multiple jobs from being queued if one gets delayed

* fix: update job handling to properly use synchronization primitives

* chore: update dockerfile base version

---------

Co-authored-by: Dillon Lees <[email protected]>
  • Loading branch information
computator and ddlees authored Oct 11, 2023
1 parent d58f192 commit c4637f8
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# syntax=docker/dockerfile:1

FROM golang:1.18 as build
FROM golang:1.20 as build
WORKDIR /app

ARG VERSION=v0.0.0
Expand Down
24 changes: 13 additions & 11 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"os/signal"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/bloodhoundad/azurehound/v2/client/rest"
Expand Down Expand Up @@ -97,19 +99,21 @@ func start(ctx context.Context) {
defer ticker.Stop()

var (
currentJob *models.ClientJob
jobQueued sync.Mutex
currentJobID atomic.Int64
)

for {
select {
case <-ticker.C:
if currentJob != nil {
log.V(1).Info("collection in progress...", "jobId", currentJob.ID)
if jobID := currentJobID.Load(); jobID != 0 {
log.V(1).Info("collection in progress...", "jobId", jobID)
if err := checkin(ctx, *bheInstance, bheClient); err != nil {
log.Error(err, "bloodhound enterprise service checkin failed")
}
} else {
} else if jobQueued.TryLock() {
go func() {
defer jobQueued.Unlock()
log.V(2).Info("checking for available collection jobs")
if jobs, err := getAvailableJobs(ctx, *bheInstance, bheClient, updatedClient.ID); err != nil {
log.Error(err, "unable to fetch available jobs for azurehound")
Expand All @@ -132,12 +136,12 @@ func start(ctx context.Context) {
if len(executableJobs) == 0 {
log.V(2).Info("there are no jobs for azurehound to complete at this time")
} else {

defer currentJobID.Store(0)
queuedJobID := executableJobs[0].ID
currentJobID.Store(int64(queuedJobID))
// Notify BHE instance of job start
currentJob = &executableJobs[0]
if err := startJob(ctx, *bheInstance, bheClient, currentJob.ID); err != nil {
if err := startJob(ctx, *bheInstance, bheClient, queuedJobID); err != nil {
log.Error(err, "failed to start job, will retry on next heartbeat")
currentJob = nil
return
}

Expand All @@ -159,10 +163,8 @@ func start(ctx context.Context) {
if err := endJob(ctx, *bheInstance, bheClient, models.JobStatusComplete, message); err != nil {
log.Error(err, "failed to end job")
} else {
log.Info(message, "id", currentJob.ID, "duration", duration.String())
log.Info(message, "id", queuedJobID, "duration", duration.String())
}

currentJob = nil
}
}
}()
Expand Down

0 comments on commit c4637f8

Please sign in to comment.