Skip to content

Commit

Permalink
Handle corrupt job files without aborting SOC
Browse files Browse the repository at this point in the history
  • Loading branch information
jertel committed Dec 27, 2023
1 parent 27c9db9 commit 8a6ebd6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 17 deletions.
6 changes: 3 additions & 3 deletions server/modules/filedatastore/filedatastoreimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,11 @@ func (datastore *FileDatastoreImpl) loadJobs() error {
if err == nil {
for _, file := range files {
job := model.NewJob()
err = json.LoadJsonFile(file, job)
if err == nil {
loadErr := json.LoadJsonFile(file, job)
if loadErr == nil {
datastore.addJob(job)
} else {
log.WithError(err).WithField("file", file).Error("Unable to load job file")
log.WithError(loadErr).WithField("file", file).Error("Unable to load job file")
}
}
}
Expand Down
45 changes: 31 additions & 14 deletions server/modules/filedatastore/filedatastoreimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func cleanup() {
os.RemoveAll(JOB_DIR)
}

func createDatastore(authorized bool) (*FileDatastoreImpl, error) {
func createDatastore(authorized bool, jobFileContents []byte) (*FileDatastoreImpl, error) {
cleanup()

var srv *server.Server
Expand All @@ -45,6 +45,10 @@ func createDatastore(authorized bool) (*FileDatastoreImpl, error) {
cfg := make(module.ModuleConfig)
cfg["jobDir"] = JOB_DIR
os.MkdirAll(JOB_DIR, 0777)

if len(jobFileContents) > 0 {
os.WriteFile(JOB_DIR+"/test.json", jobFileContents, 0644)
}
err := ds.Init(cfg)
node := ds.CreateNode(newContext(), "foo")
node.Role = "rolo"
Expand All @@ -56,14 +60,27 @@ func createDatastore(authorized bool) (*FileDatastoreImpl, error) {

func TestFileDatastoreInit(tester *testing.T) {
defer cleanup()
ds, err := createDatastore(true)
ds, err := createDatastore(true, []byte(""))
assert.NoError(tester, err)
assert.Equal(tester, DEFAULT_RETRY_FAILURE_INTERVAL_MS, ds.retryFailureIntervalMs)
}

func TestFileDatastoreInitCorruptFile(tester *testing.T) {
defer cleanup()
ds, err := createDatastore(true, []byte("garbage"))
assert.NoError(tester, err)
assert.Equal(tester, DEFAULT_RETRY_FAILURE_INTERVAL_MS, ds.retryFailureIntervalMs)
}

func TestFileDatastoreInitGoodFile(tester *testing.T) {
defer cleanup()
ds, err := createDatastore(true, []byte("{}"))
assert.NoError(tester, err)
assert.Equal(tester, DEFAULT_RETRY_FAILURE_INTERVAL_MS, ds.retryFailureIntervalMs)
}
func TestNodes(tester *testing.T) {
defer cleanup()
ds, _ := createDatastore(true)
ds, _ := createDatastore(true, []byte(""))
nodes := ds.GetNodes(newContext())
if assert.Len(tester, nodes, 1) {
assert.Equal(tester, "foo", nodes[0].Id)
Expand All @@ -82,7 +99,7 @@ func TestNodes(tester *testing.T) {

func TestJobs(tester *testing.T) {
defer cleanup()
ds, _ := createDatastore(true)
ds, _ := createDatastore(true, []byte(""))

// Test adding a job
job := ds.CreateJob(newContext())
Expand Down Expand Up @@ -129,7 +146,7 @@ func TestJobs(tester *testing.T) {

func TestJobAddUnauthorized(tester *testing.T) {
defer cleanup()
ds, _ := createDatastore(false)
ds, _ := createDatastore(false, []byte(""))

// Test adding a job
job := ds.CreateJob(newContext())
Expand All @@ -141,7 +158,7 @@ func TestJobAddUnauthorized(tester *testing.T) {

func TestJobAdd(tester *testing.T) {
defer cleanup()
ds, _ := createDatastore(true)
ds, _ := createDatastore(true, []byte(""))
assert.Len(tester, ds.jobsById, 0)

// Test adding a job
Expand All @@ -157,7 +174,7 @@ func TestJobAdd(tester *testing.T) {

func TestJobAddPivotUnauthorized(tester *testing.T) {
defer cleanup()
ds, _ := createDatastore(false)
ds, _ := createDatastore(false, []byte(""))

// Test adding an arbitrary job
job := ds.CreateJob(newContext())
Expand All @@ -169,7 +186,7 @@ func TestJobAddPivotUnauthorized(tester *testing.T) {

func TestJobAddPivot(tester *testing.T) {
defer cleanup()
ds, _ := createDatastore(true)
ds, _ := createDatastore(true, []byte(""))
assert.Len(tester, ds.jobsById, 0)

// Test adding a pivot job (requires different permission)
Expand All @@ -185,7 +202,7 @@ func TestJobAddPivot(tester *testing.T) {

func TestJobReadAuthorization(tester *testing.T) {
defer cleanup()
ds, _ := createDatastore(false)
ds, _ := createDatastore(false, []byte(""))

myJobId := 10001
anotherJobId := 10002
Expand Down Expand Up @@ -214,7 +231,7 @@ func TestJobReadAuthorization(tester *testing.T) {

func TestJobDeleteAuthorization(tester *testing.T) {
defer cleanup()
ds, _ := createDatastore(false)
ds, _ := createDatastore(false, []byte(""))

myJobId := 10001
anotherJobId := 10002
Expand Down Expand Up @@ -244,14 +261,14 @@ func TestJobDeleteAuthorization(tester *testing.T) {

func TestGetStreamFilename(tester *testing.T) {
defer cleanup()
ds, _ := createDatastore(false)
ds, _ := createDatastore(false, []byte(""))
filename := ds.getStreamFilename(ds.CreateJob(newContext()))
assert.Equal(tester, "/tmp/sensoroni.jobs/1001.bin", filename)
}

func TestUpdateInelegible(tester *testing.T) {
defer cleanup()
ds, _ := createDatastore(false)
ds, _ := createDatastore(false, []byte(""))

job := ds.CreateJob(newContext())
job.UserId = MY_USER_ID
Expand All @@ -264,7 +281,7 @@ func TestUpdateInelegible(tester *testing.T) {

func TestUpdatePreserveData(tester *testing.T) {
defer cleanup()
ds, _ := createDatastore(true)
ds, _ := createDatastore(true, []byte(""))

job := ds.CreateJob(newContext())
job.UserId = MY_USER_ID
Expand All @@ -285,7 +302,7 @@ func TestUpdatePreserveData(tester *testing.T) {

func TestFilterMatches(tester *testing.T) {
defer cleanup()
ds, _ := createDatastore(true)
ds, _ := createDatastore(true, []byte(""))

params := make(map[string]interface{})
jobParams := make(map[string]interface{})
Expand Down

0 comments on commit 8a6ebd6

Please sign in to comment.