Skip to content

Commit

Permalink
increase throughput
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelvigee committed Mar 30, 2024
1 parent 4fee323 commit fcb27b2
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 144 deletions.
29 changes: 20 additions & 9 deletions worker2/dep.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package worker2

import (
"context"
"sync"
)

type Dep interface {
Expand All @@ -10,7 +11,6 @@ type Dep interface {
Freeze()
IsFrozen() bool
GetDepsObj() *Deps
GetDependencies() []Dep
AddDep(Dep)
GetHooks() []Hook
Wait() <-chan struct{}
Expand All @@ -24,13 +24,18 @@ type Dep interface {

type baseDep struct {
execution *Execution
m sync.RWMutex
}

func (a *baseDep) setExecution(e *Execution) {
a.m.Lock()
defer a.m.Unlock()
a.execution = e
}

func (a *baseDep) getExecution() *Execution {
a.m.RLock()
defer a.m.RUnlock()
return a.execution
}

Expand Down Expand Up @@ -95,13 +100,10 @@ func (a *Action) GetDepsObj() *Deps {
if a.Deps == nil {
a.Deps = NewDeps()
}
a.Deps.setOwner(a)
return a.Deps
}

func (a *Action) GetDependencies() []Dep {
return a.GetDepsObj().Dependencies()
}

func (a *Action) AddDep(dep Dep) {
a.GetDepsObj().Add(dep)
}
Expand All @@ -110,6 +112,12 @@ func (a *Action) DeepDo(f func(Dep)) {
deepDo(a, f)
}

func (a *Action) LinkDeps() {
for _, dep := range a.GetDepsObj().TransitiveDependencies() {
_ = dep.GetDepsObj()
}
}

type Group struct {
baseDep
ID string
Expand All @@ -135,17 +143,20 @@ func (g *Group) GetID() string {
return g.ID
}

func (g *Group) LinkDeps() {
for _, dep := range g.GetDepsObj().TransitiveDependencies() {
_ = dep.GetDepsObj()
}
}

func (g *Group) GetDepsObj() *Deps {
if g.Deps == nil {
g.Deps = NewDeps()
}
g.Deps.setOwner(g)
return g.Deps
}

func (g *Group) GetDependencies() []Dep {
return g.Deps.Dependencies()
}

func (g *Group) GetHooks() []Hook {
return g.Hooks
}
Expand Down
34 changes: 23 additions & 11 deletions worker2/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,11 @@ import (
type DepHook = func(dep Dep)

func NewDeps(deps ...Dep) *Deps {
return newDeps("", deps)
return newDeps(deps)
}

func NewDepsID(id string, deps ...Dep) *Deps {
return newDeps(id, deps)
}

func newDeps(id string, deps []Dep) *Deps {
func newDeps(deps []Dep) *Deps {
d := &Deps{
id: id,
deps: sets.NewIdentitySet[Dep](0),
transitiveDeps: sets.NewIdentitySet[Dep](0),
dependees: sets.NewIdentitySet[*Deps](0),
Expand All @@ -31,7 +26,7 @@ func newDeps(id string, deps []Dep) *Deps {
}

type Deps struct {
id string
owner Dep
deps *sets.Set[Dep, Dep]
transitiveDeps *sets.Set[Dep, Dep]
dependees *sets.Set[*Deps, *Deps]
Expand All @@ -40,6 +35,16 @@ type Deps struct {
frozen bool
}

func (d *Deps) setOwner(dep Dep) {
d.m.Lock()
defer d.m.Unlock()

if d.owner != nil && d.owner != dep {
panic("deps owner is already set")
}
d.owner = dep
}

func (d *Deps) IsFrozen() bool {
return d.frozen
}
Expand Down Expand Up @@ -79,6 +84,13 @@ func (d *Deps) Dependencies() []Dep {
return d.deps.Slice()
}

func (d *Deps) Dependees() []*Deps {
d.m.RLock()
defer d.m.RUnlock()

return d.dependees.Slice()
}

func (d *Deps) TransitiveDependencies() []Dep {
d.m.RLock()
defer d.m.RUnlock()
Expand Down Expand Up @@ -160,7 +172,7 @@ func (d *Deps) hasDependee(dep *Deps) bool {

func (d *Deps) DebugString() string {
var sb strings.Builder
fmt.Fprintf(&sb, "%v:\n", d.id)
fmt.Fprintf(&sb, "%v:\n", d.owner.GetID())
deps := ads.Map(d.deps.Slice(), func(d Dep) string {
return d.GetID()
})
Expand All @@ -171,10 +183,10 @@ func (d *Deps) DebugString() string {
fmt.Fprintf(&sb, " tdeps: %v\n", tdeps)

depdees := ads.Map(d.dependees.Slice(), func(d *Deps) string {
return d.id
return d.owner.GetID()
})
tdepdees := ads.Map(d.transitiveDependees.Slice(), func(d *Deps) string {
return d.id
return d.owner.GetID()
})
fmt.Fprintf(&sb, " depdees: %v\n", depdees)
fmt.Fprintf(&sb, " tdepdees: %v\n", tdepdees)
Expand Down
18 changes: 10 additions & 8 deletions worker2/deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,45 @@ func s(s string) string {
}

func TestLink(t *testing.T) {
d1 := &Action{ID: "1", Deps: NewDepsID("1")}
d2 := &Action{ID: "2", Deps: NewDepsID("2", d1)}
d1 := &Action{ID: "1", Deps: NewDeps()}
d2 := &Action{ID: "2", Deps: NewDeps(d1)}

d3 := &Action{ID: "3", Deps: NewDepsID("3")}
d4 := &Action{ID: "4", Deps: NewDepsID("4", d3)}
d3 := &Action{ID: "3", Deps: NewDeps()}
d4 := &Action{ID: "4", Deps: NewDeps(d3)}

d3.AddDep(d2)

d4.LinkDeps()

assert.Equal(t, s(`
1:
deps: []
tdeps: []
depdees: [2]
tdepdees: [2 4 3]
`), d1.Deps.DebugString())
`), d1.GetDepsObj().DebugString())

assert.Equal(t, s(`
2:
deps: [1]
tdeps: [1]
depdees: [3]
tdepdees: [4 3]
`), d2.Deps.DebugString())
`), d2.GetDepsObj().DebugString())

assert.Equal(t, s(`
3:
deps: [2]
tdeps: [2 1]
depdees: [4]
tdepdees: [4]
`), d3.Deps.DebugString())
`), d3.GetDepsObj().DebugString())

assert.Equal(t, s(`
4:
deps: [3]
tdeps: [3 2 1]
depdees: []
tdepdees: []
`), d4.Deps.DebugString())
`), d4.GetDepsObj().DebugString())
}
Loading

0 comments on commit fcb27b2

Please sign in to comment.