Skip to content

Commit

Permalink
Updated parallelism model to use less goroutines and more cache (#42)
Browse files Browse the repository at this point in the history
Throttle locks are now take earlier, resulting in less CPU contention and allowing the cache to be more useful
  • Loading branch information
dylanratcliffe authored May 13, 2022
1 parent 27d8a9e commit 3f357ca
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 335 deletions.
16 changes: 0 additions & 16 deletions Update.md

This file was deleted.

185 changes: 185 additions & 0 deletions performance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package discovery

import (
"context"
"math"
"sync"
"testing"
"time"

"github.com/overmindtech/sdp-go"
)

type SlowSource struct {
RequestDuration time.Duration
}

func (s *SlowSource) Type() string {
return "person"
}

func (s *SlowSource) Name() string {
return "slow-source"
}

func (s *SlowSource) DefaultCacheDuration() time.Duration {
return 10 * time.Minute
}

func (s *SlowSource) Contexts() []string {
return []string{"test"}
}

func (s *SlowSource) Hidden() bool {
return false
}

func (s *SlowSource) Get(ctx context.Context, itemContext string, query string) (*sdp.Item, error) {
end := time.Now().Add(s.RequestDuration)
attributes, _ := sdp.ToAttributes(map[string]interface{}{
"name": query,
})

item := sdp.Item{
Type: "person",
UniqueAttribute: "name",
Attributes: attributes,
Context: "test",
LinkedItemRequests: []*sdp.ItemRequest{},
}

for i := 0; i != 2; i++ {
item.LinkedItemRequests = append(item.LinkedItemRequests, &sdp.ItemRequest{
Type: "person",
Method: sdp.RequestMethod_GET,
Query: RandomName(),
Context: "test",
})
}

time.Sleep(time.Until(end))

return &item, nil
}

func (s *SlowSource) Find(ctx context.Context, itemContext string) ([]*sdp.Item, error) {
return []*sdp.Item{}, nil
}

func (s *SlowSource) Weight() int {
return 100
}

func TestParallelRequestPerformance(t *testing.T) {
// This test is designed to ensure that request duration is linear up to a
// certain point. Above that point the overhead caused by having so many
// goroutines running will start to make the response times non-linear which
// maybe isn't ideal but given realistic loads we probably don't care.
t.Run("Without linking", func(t *testing.T) {
RunLinearPerformanceTest(t, "10 requests", 10, 0, 1)
RunLinearPerformanceTest(t, "100 requests", 100, 0, 10)
RunLinearPerformanceTest(t, "1,000 requests", 1000, 0, 100)
})

t.Run("With linking", func(t *testing.T) {
RunLinearPerformanceTest(t, "1 request 3 depth", 1, 3, 1)
RunLinearPerformanceTest(t, "1 request 3 depth", 1, 3, 100)
RunLinearPerformanceTest(t, "1 request 5 depth", 1, 5, 100)
RunLinearPerformanceTest(t, "10 requests 5 depth", 10, 5, 100)
})
}

// RunLinearPerformanceTest Runs a test with a given number in input requests,
// link depth and parallelisation limit. Expected results and expected duration
// are determined automatically meaning all this is testing for is the fact that
// the perfomance continues to be linear and predictable
func RunLinearPerformanceTest(t *testing.T, name string, numRequests int, linkDepth int, numParallel int) {
t.Helper()

t.Run(name, func(t *testing.T) {
result := TimeRequests(numRequests, linkDepth, numParallel)

if len(result.Results) != result.ExpectedItems {
t.Errorf("Expected %v items, got %v", result.ExpectedItems, len(result.Results))
}

if result.TimeTaken > result.MaxTime {
t.Errorf("Requests took too long: %v Max: %v", result.TimeTaken.String(), result.MaxTime.String())
}
})
}

type TimedResults struct {
ExpectedItems int
MaxTime time.Duration
TimeTaken time.Duration
Results []*sdp.Item
Errors []error
}

func TimeRequests(numRequests int, linkDepth int, numParallel int) TimedResults {
engine := Engine{
Name: "performance-test",
MaxParallelExecutions: numParallel,
}
engine.AddSources(&SlowSource{
RequestDuration: 100 * time.Millisecond,
})
engine.Start()
defer engine.Stop()

// Calculate how many items to expect and the expected duration
var expectedItems int
var expectedDuration time.Duration
for i := 0; i <= linkDepth; i++ {
thisLayer := int(math.Pow(2, float64(i))) * numRequests
thisDuration := 200 * math.Ceil(float64(thisLayer)/float64(numParallel))
expectedDuration = expectedDuration + (time.Duration(thisDuration) * time.Millisecond)
expectedItems = expectedItems + thisLayer
}

results := make([]*sdp.Item, 0)
errors := make([]error, 0)
resultsMutex := sync.Mutex{}
wg := sync.WaitGroup{}

start := time.Now()

for i := 0; i < numRequests; i++ {
rt := RequestTracker{
Request: &sdp.ItemRequest{
Type: "person",
Method: sdp.RequestMethod_GET,
Query: RandomName(),
Context: "test",
LinkDepth: uint32(linkDepth),
},
Engine: &engine,
}

wg.Add(1)

go func(rt *RequestTracker) {
defer wg.Done()

items, err := rt.Execute()

resultsMutex.Lock()
results = append(results, items...)
if err != nil {
errors = append(errors, err)
}
resultsMutex.Unlock()
}(&rt)
}

wg.Wait()

return TimedResults{
ExpectedItems: expectedItems,
MaxTime: expectedDuration,
TimeTaken: time.Since(start),
Results: results,
Errors: errors,
}
}
91 changes: 0 additions & 91 deletions request_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package discovery

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -86,96 +85,6 @@ func (s *SpeedTestSource) Weight() int {
return 10
}

func TestExecuteParallel(t *testing.T) {
queryDelay := (200 * time.Millisecond)
numSources := 10
sources := make([]Source, numSources)

// Create a number of sources
for i := 0; i < len(sources); i++ {
sources[i] = &SpeedTestSource{
QueryDelay: queryDelay,
ReturnType: fmt.Sprintf("type%v", i),
}
}

t.Run("With no parallelism", func(t *testing.T) {
t.Parallel()

engine := Engine{
Name: "no-parallel",
MaxParallelExecutions: 1,
}

engine.AddSources(sources...)
engine.SetupThrottle()

tracker := RequestTracker{
Engine: &engine,
Request: &sdp.ItemRequest{
Type: "*",
Method: sdp.RequestMethod_FIND,
LinkDepth: 0,
Context: "*",
},
}

timeStart := time.Now()

_, err := tracker.Execute()

timeTaken := time.Since(timeStart)

if err != nil {
t.Fatal(err)
}

expectedTime := time.Duration(int64(queryDelay) * int64(numSources))

if timeTaken < expectedTime {
t.Errorf("Query with no parallelism took < %v. This means it must have run in parallel", expectedTime)
}
})

t.Run("With lots of parallelism", func(t *testing.T) {
t.Parallel()

engine := Engine{
Name: "no-parallel",
MaxParallelExecutions: 999,
}

engine.AddSources(sources...)
engine.SetupThrottle()

tracker := RequestTracker{
Engine: &engine,
Request: &sdp.ItemRequest{
Type: "*",
Method: sdp.RequestMethod_FIND,
LinkDepth: 0,
Context: "*",
},
}

timeStart := time.Now()

_, err := tracker.Execute()

timeTaken := time.Since(timeStart)

if err != nil {
t.Fatal(err)
}

expectedTime := (queryDelay * 2) // Double it give us some wiggle room

if timeTaken > expectedTime {
t.Errorf("Query with no parallelism took %v which is > than the expected max of %v. This means it must not have run in parallel", timeTaken, expectedTime)
}
})
}

func TestExecute(t *testing.T) {
engine := Engine{
Name: "test",
Expand Down
38 changes: 21 additions & 17 deletions requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,31 @@ func (e *Engine) ExecuteRequest(ctx context.Context, req *sdp.ItemRequest) ([]*s
}
}

allItems := make([]*sdp.Item, 0)
allErrors := make([]error, 0)

go func() {
for item := range items {
allItems = append(allItems, item)
}
done <- true
}()

go func() {
for err := range errors {
allErrors = append(allErrors, err)
}
done <- true
}()

for request, sources := range expanded {
wg.Add(1)

e.throttle.Lock()

go func(r *sdp.ItemRequest, sources []Source) {
defer wg.Done()
defer e.throttle.Unlock()
var requestItems []*sdp.Item
var requestError error

Expand Down Expand Up @@ -188,23 +209,6 @@ func (e *Engine) ExecuteRequest(ctx context.Context, req *sdp.ItemRequest) ([]*s
}(request, sources)
}

allItems := make([]*sdp.Item, 0)
allErrors := make([]error, 0)

go func() {
for item := range items {
allItems = append(allItems, item)
}
done <- true
}()

go func() {
for err := range errors {
allErrors = append(allErrors, err)
}
done <- true
}()

// Wait for all requests to complete
wg.Wait()

Expand Down
17 changes: 11 additions & 6 deletions shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,24 @@ import (
"google.golang.org/protobuf/types/known/structpb"
)

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

func randSeq(n int) string {
b := make([]rune, n)
func randString(length int) string {
var seededRand *rand.Rand = rand.New(rand.NewSource(time.Now().UnixNano()))

b := make([]byte, length)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
b[i] = charset[seededRand.Intn(len(charset))]
}
return string(b)
}

func RandomName() string {
n := namegenerator.NewNameGenerator(time.Now().UTC().UnixNano())
return n.Generate() + " " + n.Generate() + "-" + randSeq(10)
seed := time.Now().UTC().UnixNano()
nameGenerator := namegenerator.NewNameGenerator(seed)
name := nameGenerator.Generate()
randGarbage := randString(10)
return fmt.Sprintf("%v-%v", name, randGarbage)
}

func (s *TestSource) NewTestItem(itemContext string, query string) *sdp.Item {
Expand Down
Loading

0 comments on commit 3f357ca

Please sign in to comment.