Skip to content

Commit

Permalink
fix(server): mutex calls to sqlitex (#13166)
Browse files Browse the repository at this point in the history
  • Loading branch information
Joibel authored Jun 13, 2024
1 parent e00757b commit 0ca0c0f
Showing 1 changed file with 15 additions and 0 deletions.
15 changes: 15 additions & 0 deletions server/workflow/store/sqlite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -73,6 +74,7 @@ type WorkflowStore interface {
type SQLiteStore struct {
conn *sqlite.Conn
instanceService instanceid.Service
mtx sync.Mutex
}

var _ WorkflowStore = &SQLiteStore{}
Expand Down Expand Up @@ -102,6 +104,8 @@ where instanceid = ?
}

var workflows = wfv1.Workflows{}
s.mtx.Lock()
defer s.mtx.Unlock()
err = sqlitex.Execute(s.conn, query, &sqlitex.ExecOptions{
Args: args,
ResultFunc: func(stmt *sqlite.Stmt) error {
Expand Down Expand Up @@ -143,6 +147,8 @@ where instanceid = ?
}

var total int64
s.mtx.Lock()
defer s.mtx.Unlock()
err = sqlitex.Execute(s.conn, query, &sqlitex.ExecOptions{
Args: args,
ResultFunc: func(stmt *sqlite.Stmt) error {
Expand All @@ -161,6 +167,8 @@ func (s *SQLiteStore) Add(obj interface{}) error {
if !ok {
return fmt.Errorf("unable to convert object to Workflow. object: %v", obj)
}
s.mtx.Lock()
defer s.mtx.Unlock()
done := sqlitex.Transaction(s.conn)
err := s.upsertWorkflow(wf)
defer done(&err)
Expand All @@ -172,6 +180,8 @@ func (s *SQLiteStore) Update(obj interface{}) error {
if !ok {
return fmt.Errorf("unable to convert object to Workflow. object: %v", obj)
}
s.mtx.Lock()
defer s.mtx.Unlock()
done := sqlitex.Transaction(s.conn)
err := s.upsertWorkflow(wf)
defer done(&err)
Expand All @@ -183,6 +193,8 @@ func (s *SQLiteStore) Delete(obj interface{}) error {
if !ok {
return fmt.Errorf("unable to convert object to Workflow. object: %v", obj)
}
s.mtx.Lock()
defer s.mtx.Unlock()
return sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}})
}

Expand All @@ -195,6 +207,8 @@ func (s *SQLiteStore) Replace(list []interface{}, resourceVersion string) error
}
wfs = append(wfs, wf)
}
s.mtx.Lock()
defer s.mtx.Unlock()
done := sqlitex.Transaction(s.conn)
err := s.replaceWorkflows(wfs)
defer done(&err)
Expand Down Expand Up @@ -222,6 +236,7 @@ func (s *SQLiteStore) GetByKey(key string) (item interface{}, exists bool, err e
}

func (s *SQLiteStore) upsertWorkflow(wf *wfv1.Workflow) error {
// Called with the mutex
err := sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}})
if err != nil {
return err
Expand Down

0 comments on commit 0ca0c0f

Please sign in to comment.