Skip to content

Commit

Permalink
feat: validate if config is a DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
legosandorigami committed Aug 19, 2024
1 parent 660f572 commit 859cffb
Show file tree
Hide file tree
Showing 4 changed files with 312 additions and 0 deletions.
258 changes: 258 additions & 0 deletions cmd/waymond/dag_validation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
package main

import (
"errors"
"fmt"
"testing"

"github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/knadh/koanf/v2"
"github.com/scriptnull/waymond/internal/connector"
"github.com/scriptnull/waymond/internal/connector/direct"
)

const testcase1 = `[[trigger]]
type = "cron"
id = "global_cron"
expression = "*/1 * * * *"
[[trigger]]
type = "buildkite"
id = "my_buildkite_org"
filter_by_queue_name = "aws-on-demand-arm64-ubuntu-.*-ami-.*"
# set BUILDKITE_TOKEN environment variable
[[connect]]
type = "direct"
id = "check_my_buildkite_org_queues_periodically"
from = "trigger.global_cron"
to = "trigger.my_buildkite_org"
[[scaler]]
type = "noop"
id = "noop"
[[connect]]
type = "direct"
id = "print_trigger_output"
from = "trigger.my_buildkite_org"
to = "scaler.noop"
[connect.transform]
method = "go_template"
template = """
{
"asg_name": "{{ .queue }}",
"desired_count": {{ .scheduled_jobs_count }}
}
"""`

const testcase2 = `
[[trigger]]
type = "cron"
id = "global_cron"
expression = "*/1 * * * *"
[[trigger]]
type = "buildkite"
id = "buildkite_1"
filter_by_queue_name = "aws-on-demand-arm64-ubuntu-.*-ami-.*"
# set BUILDKITE_TOKEN environment variable
[[trigger]]
type = "buildkite"
id = "buildkite_2"
filter_by_queue_name = "aws-on-demand-arm64-ubuntu-.*-ami-.*"
# set BUILDKITE_TOKEN environment variable
[[connect]]
type = "direct"
id = "connect_cron_to_buildkite_1"
from = "trigger.global_cron"
to = "trigger.buildkite_1"
[[connect]]
type = "direct"
id = "connect_buildkite_1_to_buildkite_2"
from = "trigger.buildkite_1"
to = "trigger.buildkite_2"
[[scaler]]
type = "noop"
id = "noop"
[[connect]]
type = "direct"
id = "connect_buildkite_1_to_noop"
from = "trigger.buildkite_1"
to = "scaler.noop"
[[connect]]
type = "direct"
id = "connect_buildkite_2_to_noop"
from = "trigger.buildkite_2"
to = "scaler.noop"`

const testcase3 = `
[[trigger]]
type = "cron"
id = "global_cron"
expression = "*/1 * * * *"
[[trigger]]
type = "buildkite"
id = "buildkite"
filter_by_queue_name = "aws-on-demand-arm64-ubuntu-.*-ami-.*"
# set BUILDKITE_TOKEN environment variable
[[connect]]
type = "direct"
id = "connect_cron_to_buildkite"
from = "trigger.global_cron"
to = "trigger.buildkite"
[[connect]]
type = "direct"
id = "connect_buildkite_to_itself"
from = "trigger.buildkite"
to = "trigger.buildkite"
[[scaler]]
type = "noop"
id = "noop"`

const testcase4 = `
[[trigger]]
type = "cron"
id = "global_cron"
expression = "*/1 * * * *"
[[trigger]]
type = "buildkite"
id = "buildkite_1"
filter_by_queue_name = "aws-on-demand-arm64-ubuntu-.*-ami-.*"
# set BUILDKITE_TOKEN environment variable
[[trigger]]
type = "buildkite"
id = "buildkite_2"
filter_by_queue_name = "aws-on-demand-arm64-ubuntu-.*-ami-.*"
# set BUILDKITE_TOKEN environment variable
[[connect]]
type = "direct"
id = "connect_cron_to_buildkite_1"
from = "trigger.global_cron"
to = "trigger.buildkite_1"
[[connect]]
type = "direct"
id = "connect_buildkite_1_to_buildkite_2"
from = "trigger.buildkite_1"
to = "trigger.buildkite_2"
[[connect]]
type = "direct"
id = "connect_buildkite_2_to_buildkite_1"
from = "trigger.buildkite_2"
to = "trigger.buildkite_1"
[[scaler]]
type = "noop"
id = "noop"
[[connect]]
type = "direct"
id = "connect_buildkite_2_to_noop"
from = "trigger.buildkite_2"
to = "trigger.noop"`

func TestVerifyDAG(t *testing.T) {
tt := []struct {
conf string
isDAG bool
expectsError bool
}{
{
conf: testcase1,
isDAG: true,
},
{
conf: testcase2,
isDAG: true,
},
{
conf: testcase3,
isDAG: false,
},
{
conf: testcase4,
isDAG: false,
},
}

for i, tc := range tt {
connectors, err := extractConnectors(tc.conf)
if err != nil {
t.Error("Exracting connectors returned errors")
}

if tc.isDAG != verifyDAG(connectors) {
isDag := "false"
if tc.isDAG {
isDag = "true"
}
resultString := fmt.Sprintf("testcase%d: isDAG: %s but returned otherwise", i, isDag)
t.Errorf("Failed to correctly verify DAG: %s", resultString)
}
}
}

func extractConnectors(conf string) (map[string]connector.Interface, error) {
var kt = koanf.New(".")
var errs []error

// read waymond config file
if err := kt.Load(rawbytes.Provider([]byte(conf)), toml.Parser()); err != nil {
return nil, err
}

// track available connector configuration parsers available out of the box in waymond
connectorConfigParsers := make(map[connector.Type]func(*koanf.Koanf) (connector.Interface, error))
connectorConfigParsers[direct.Type] = direct.ParseConfig

// extract connector from connector configurations
connectorConfigs := kt.Slices("connect")
connectors := make(map[string]connector.Interface)
for _, connectorConfig := range connectorConfigs {
ttype := connectorConfig.String("type")
if ttype == "" {
errs = append(errs, fmt.Errorf("expected a non-empty 'type' field for connector: %+v", connectorConfig))
continue
}

id := connectorConfig.String("id")
if id == "" {
errs = append(errs, fmt.Errorf("expected a non-empty 'id' field for connector: %+v", connectorConfig))
continue
}

parseConfig, found := connectorConfigParsers[connector.Type(ttype)]
if !found {
errs = append(errs, fmt.Errorf("unknown 'type' value in connector: %s in %+v", ttype, connectorConfig))
continue
}

connector, err := parseConfig(connectorConfig)
if err != nil {
errs = append(errs, err)
continue
}
connectors[id] = connector
}
if len(errs) > 0 {
return nil, errors.New("Error Extracting connectors from the config file")
}

return connectors, nil
}
43 changes: 43 additions & 0 deletions cmd/waymond/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"os"
Expand Down Expand Up @@ -160,6 +161,11 @@ func main() {
os.Exit(1)
}

if is_DAG := verifyDAG(connectors); !is_DAG {
corelog.Error(errors.New("invalid config. Config must be Directed Acyclic Graph(DAG)"))
os.Exit(1)
}

ctx := context.Background()

err := event.Init()
Expand Down Expand Up @@ -226,3 +232,40 @@ func main() {
<-done
corelog.Verbose("stopped waymond")
}

func verifyDAG(connectors map[string]connector.Interface) bool {
// Map to keep track of visited nodes
visited := make(map[string]bool)
// Map to track nodes currently in the recursion stack, indicating potential cycles
stack := make(map[string]bool)

// Iterate through all connectors
for _, connector := range connectors {
if !visited[connector.From()] {
if isCyclic(connector.From(), connectors, visited, stack) {
return false
}
}
}
return true
}

func isCyclic(node string, connectors map[string]connector.Interface, visited, stack map[string]bool) bool {
visited[node] = true
stack[node] = true

// Iterate over all outgoing edges from the current node
for _, connector := range connectors {
if connector.From() == node {
if !visited[connector.To()] && isCyclic(connector.To(), connectors, visited, stack) {
return true
// If the node is already in the recursion stack, a cycle is detected
} else if stack[connector.To()] {
return true
}
}
}

stack[node] = false
return false
}
3 changes: 3 additions & 0 deletions internal/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ type Interface interface {
// It is executed exactly once for a given connection
// i.e. when waymond boots up
Register(ctx context.Context) error

From() string
To() string
}
8 changes: 8 additions & 0 deletions internal/connector/direct/direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,11 @@ func ParseConfig(k *koanf.Koanf) (connector.Interface, error) {

return c, nil
}

func (c *Connector) To() string {
return c.to
}

func (c *Connector) From() string {
return c.from
}

0 comments on commit 859cffb

Please sign in to comment.