Skip to content

Commit

Permalink
chore: parallel stop, use slog from the log/slog package
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Aug 16, 2023
1 parent bfa6cca commit 60a493e
Show file tree
Hide file tree
Showing 22 changed files with 58 additions and 40 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ First, we initialize the `endure` container:

```go
import (
"golang.org/x/exp/slog"
"log/slog"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion edges.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package endure

import (
"log/slog"
"reflect"

"github.com/roadrunner-server/endure/v2/graph"
"github.com/roadrunner-server/errors"
"golang.org/x/exp/slog"
)

func (e *Endure) resolveCollectorEdges(plugin any) error {
Expand Down
3 changes: 2 additions & 1 deletion endure.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package endure

import (
"log/slog"
"net/http"

// pprof will be enabled in debug mode
"net/http/pprof"
"os"
Expand All @@ -12,7 +14,6 @@ import (
"github.com/roadrunner-server/endure/v2/graph"
"github.com/roadrunner-server/endure/v2/registar"
"github.com/roadrunner-server/errors"
"golang.org/x/exp/slog"
)

// Endure struct represent main endure repr
Expand Down
2 changes: 1 addition & 1 deletion examples/sample_1/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"log/slog"
"os"
"os/signal"
"syscall"
Expand All @@ -12,7 +13,6 @@ import (
"samples/modules/logger"

"github.com/roadrunner-server/endure/v2"
"golang.org/x/exp/slog"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toolchain go1.21.0
require (
github.com/roadrunner-server/errors v1.3.0
github.com/stretchr/testify v1.8.4
golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb
golang.org/x/sync v0.3.0
)

require (
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ github.com/roadrunner-server/errors v1.3.0 h1:kLVXpXne0jMReN7pj8KIhyYyjqKjsPC5DR
github.com/roadrunner-server/errors v1.3.0/go.mod h1:XYVuhXvxi3yQaP/zCLB6QRZ0JvQIRaBa0SKFHL4WLKg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/exp v0.0.0-20230810033253-352e893a4cad h1:g0bG7Z4uG+OgH2QDODnjp6ggkk1bJDsINcuWmJN1iJU=
golang.org/x/exp v0.0.0-20230810033253-352e893a4cad/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb h1:mIKbk8weKhSeLH2GmUTrvx8CjkyJmnU1wFmg59CUjFA=
golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
2 changes: 1 addition & 1 deletion init.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package endure

import (
"log/slog"
"reflect"

"github.com/roadrunner-server/errors"
"golang.org/x/exp/slog"
)

func (e *Endure) init() error {
Expand Down
3 changes: 1 addition & 2 deletions options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package endure

import (
"log/slog"
"time"

"golang.org/x/exp/slog"
)

// GracefulShutdownTimeout sets the timeout to kill the vertices is one or more of them are frozen
Expand Down
2 changes: 1 addition & 1 deletion poller.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package endure

import (
"golang.org/x/exp/slog"
"log/slog"
)

// poll is used to poll the errors from the vertex
Expand Down
2 changes: 1 addition & 1 deletion serve.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package endure

import (
"log/slog"
"reflect"
"sort"

"github.com/roadrunner-server/endure/v2/graph"
"github.com/roadrunner-server/errors"
"golang.org/x/exp/slog"
)

func (e *Endure) serve() error {
Expand Down
50 changes: 35 additions & 15 deletions stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package endure

import (
"context"
stderr "errors"
"log/slog"
"reflect"
"sync"

"golang.org/x/sync/semaphore"

"github.com/roadrunner-server/errors"
"golang.org/x/exp/slog"
)

func (e *Endure) stop() error {
Expand All @@ -18,6 +22,10 @@ func (e *Endure) stop() error {
return errors.E(errors.Str("error occurred, nothing to run"))
}

mu := new(sync.Mutex)
errs := make([]error, 0, 2)
sema := semaphore.NewWeighted(int64(len(vertices)))

// reverse order
for i := len(vertices) - 1; i >= 0; i-- {
if !vertices[i].IsActive() {
Expand All @@ -28,26 +36,38 @@ func (e *Endure) stop() error {
continue
}

stopMethod, _ := reflect.TypeOf(vertices[i].Plugin()).MethodByName(StopMethodName)
_ = sema.Acquire(context.Background(), 1)
go func(i int) {
stopMethod, _ := reflect.TypeOf(vertices[i].Plugin()).MethodByName(StopMethodName)

var inVals []reflect.Value
inVals = append(inVals, reflect.ValueOf(vertices[i].Plugin()))

var inVals []reflect.Value
inVals = append(inVals, reflect.ValueOf(vertices[i].Plugin()))
e.log.Debug(
"calling stop function",
slog.String("plugin", vertices[i].ID().String()),
)

e.log.Debug(
"calling stop function",
slog.String("plugin", vertices[i].ID().String()),
)
ctx, cancel := context.WithTimeout(context.Background(), e.stopTimeout)
inVals = append(inVals, reflect.ValueOf(ctx))

ctx, cancel := context.WithTimeout(context.Background(), e.stopTimeout)
inVals = append(inVals, reflect.ValueOf(ctx))
ret := stopMethod.Func.Call(inVals)[0].Interface()
if ret != nil {
e.log.Error("failed to stop the plugin", slog.String("error", ret.(error).Error()))
mu.Lock()
errs = append(errs, ret.(error))
mu.Unlock()
}

ret := stopMethod.Func.Call(inVals)[0].Interface()
if ret != nil {
sema.Release(1)
cancel()
return ret.(error)
}
}(i)
}

_ = sema.Acquire(context.Background(), int64(len(vertices)))

cancel()
if len(errs) > 0 {
return stderr.Join(errs...)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion tests/disabled_vertices/disabled_vertices_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package disabled_vertices

import (
"log/slog"
"testing"

"github.com/roadrunner-server/endure/v2"
Expand All @@ -15,7 +16,6 @@ import (
"github.com/roadrunner-server/endure/v2/tests/disabled_vertices/plugin9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slog"
)

func TestVertexDisabled(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion tests/general/test1/test1_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package test1

import (
"log/slog"
"testing"

"github.com/roadrunner-server/endure/v2"
Expand All @@ -10,7 +11,6 @@ import (
"github.com/roadrunner-server/endure/v2/tests/general/test1/p4"
"github.com/roadrunner-server/endure/v2/tests/general/test1/p5"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/slog"
)

func Test1(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion tests/general/test2/test2_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package test1

import (
"log/slog"
"testing"

"github.com/roadrunner-server/endure/v2"
Expand All @@ -11,7 +12,6 @@ import (
"github.com/roadrunner-server/endure/v2/tests/general/test2/p5"
"github.com/roadrunner-server/endure/v2/tests/general/test2/p6"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/slog"
)

func Test1(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion tests/general/test3/test3_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package test3

import (
"log/slog"
"testing"

"github.com/roadrunner-server/endure/v2"
"github.com/roadrunner-server/endure/v2/tests/general/test3/p1"
"github.com/roadrunner-server/endure/v2/tests/general/test3/p2"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/slog"
)

func Test1(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion tests/general/test4/test4_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package test4

import (
"log/slog"
"testing"

"github.com/roadrunner-server/endure/v2"
"github.com/roadrunner-server/endure/v2/tests/general/test4/p1"
"github.com/roadrunner-server/endure/v2/tests/general/test4/p2"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/slog"
)

func Test1(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion tests/general/test5/test5_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package test5

import (
"log/slog"
"testing"

"github.com/roadrunner-server/endure/v2"
"github.com/roadrunner-server/endure/v2/tests/general/test5/p1"
"github.com/roadrunner-server/endure/v2/tests/general/test5/p2"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/slog"
)

func Test1(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion tests/happy_scenarios/happyScenario_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package happy_scenarios

import (
"log/slog"
"testing"
"time"

Expand All @@ -16,7 +17,6 @@ import (
plugin12 "github.com/roadrunner-server/endure/v2/tests/happy_scenarios/provided_value_but_need_pointer/plugin1"
plugin22 "github.com/roadrunner-server/endure/v2/tests/happy_scenarios/provided_value_but_need_pointer/plugin2"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/slog"
)

func TestEndure_DifferentLogLevels(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion tests/init/init_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package init

import (
"log/slog"
"sync"
"testing"
"time"
Expand All @@ -10,7 +11,6 @@ import (
"github.com/roadrunner-server/endure/v2/tests/init/plugins/plugin3"
"github.com/roadrunner-server/endure/v2/tests/init/plugins/plugin4"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/slog"
)

func TestEndure_MainThread_Serve(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion tests/interfaces/interfaces_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package interfaces

import (
"log/slog"
"testing"
"time"

Expand All @@ -19,7 +20,6 @@ import (
"github.com/roadrunner-server/endure/v2/tests/interfaces/plugins/plugin9"
notImplPlugin1 "github.com/roadrunner-server/endure/v2/tests/interfaces/service/not_implemented_service/plugin1"
notImplPlugin2 "github.com/roadrunner-server/endure/v2/tests/interfaces/service/not_implemented_service/plugin2"
"golang.org/x/exp/slog"

"github.com/roadrunner-server/endure/v2/tests/interfaces/collects/collects_get_all_deps"
"github.com/stretchr/testify/assert"
Expand Down
2 changes: 1 addition & 1 deletion tests/issues/issues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package issues

import (
"fmt"
"log/slog"
"testing"
"time"

Expand All @@ -10,7 +11,6 @@ import (
issue55p1 "github.com/roadrunner-server/endure/v2/tests/issues/issue55/plugin1"
issue55p2 "github.com/roadrunner-server/endure/v2/tests/issues/issue55/plugin2"
issue55p3 "github.com/roadrunner-server/endure/v2/tests/issues/issue55/plugin3"
"golang.org/x/exp/slog"

issue66p1 "github.com/roadrunner-server/endure/v2/tests/issues/issue66/plugin1"
issue66p2 "github.com/roadrunner-server/endure/v2/tests/issues/issue66/plugin2"
Expand Down
2 changes: 1 addition & 1 deletion tests/stress/stress_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stress

import (
"log/slog"
"testing"
"time"

Expand All @@ -14,7 +15,6 @@ import (
"github.com/roadrunner-server/endure/v2/tests/stress/ServeErr"
"github.com/roadrunner-server/endure/v2/tests/stress/mixed"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/slog"
)

func TestEndure_Init_Err(t *testing.T) {
Expand Down

0 comments on commit 60a493e

Please sign in to comment.