Skip to content

Commit

Permalink
Merge pull request #49 from nitrictech/feature/membrane-stop
Browse files Browse the repository at this point in the history
fix: use working interrupt code
  • Loading branch information
tjholm authored May 26, 2021
2 parents da1cdd7 + 5af785a commit 55d8c56
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 50 deletions.
5 changes: 2 additions & 3 deletions membrane/membrane.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,8 @@ func (s *Membrane) Start() error {
hndlr = handler.NewHttpHandler(s.childAddress)
break
}

err = s.gatewayPlugin.Start(hndlr)
return err

return s.gatewayPlugin.Start(hndlr)
}

func (s *Membrane) Stop() {
Expand Down
14 changes: 5 additions & 9 deletions plugins/gateway/app_platform/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,14 @@ func (s *HttpGateway) Start(handler handler.TriggerHandler) error {
Handler: httpHandler(handler),
}

go (func() {
err := s.server.ListenAndServe(s.address)
if err != nil {
panic(err)
}
})()

return nil
return s.server.ListenAndServe(s.address)
}

func (s *HttpGateway) Stop() error {
return s.server.Shutdown()
if s.server != nil {
return s.server.Shutdown()
}
return nil
}

// Create new HTTP gateway
Expand Down
13 changes: 5 additions & 8 deletions plugins/gateway/appservice/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,14 @@ func (s *HttpService) Start(handler handler.TriggerHandler) error {
Handler: httpHandler(handler),
}

go (func() {
err := s.server.ListenAndServe(s.address)
if err != nil {
panic(err)
}
})()
return nil
return s.server.ListenAndServe(s.address)
}

func (s *HttpService) Stop() error {
return s.server.Shutdown()
if s.server != nil {
return s.server.Shutdown()
}
return nil
}

// Create a new HTTP Gateway plugin
Expand Down
13 changes: 5 additions & 8 deletions plugins/gateway/cloudrun/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,14 @@ func (s *HttpProxyGateway) Start(handler handler.TriggerHandler) error {
Handler: httpHandler(handler),
}

go (func() {
err := s.server.ListenAndServe(s.address)
if err != nil {
panic(err)
}
})()
return nil
return s.server.ListenAndServe(s.address)
}

func (s *HttpProxyGateway) Stop() error {
return s.server.Shutdown()
if s.server != nil {
return s.server.Shutdown()
}
return nil
}

// Create new DynamoDB documents server
Expand Down
13 changes: 5 additions & 8 deletions plugins/gateway/dev/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,14 @@ func (s *HttpGateway) Start(handler handler.TriggerHandler) error {
Handler: httpHandler(handler),
}

go (func() {
err := s.server.ListenAndServe(s.address)
if err != nil {
panic(err)
}
})()
return nil
return s.server.ListenAndServe(s.address)
}

func (s *HttpGateway) Stop() error {
return s.server.Shutdown()
if s.server != nil {
return s.server.Shutdown()
}
return nil
}

// Create new HTTP gateway
Expand Down
15 changes: 9 additions & 6 deletions plugins/gateway/lambda/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/nitric-dev/membrane/sdk"
"github.com/nitric-dev/membrane/triggers"
"strings"
"syscall"
)

type eventType int
Expand Down Expand Up @@ -162,6 +161,7 @@ type LambdaGateway struct {
handler handler.TriggerHandler
runtime LambdaRuntimeHandler
sdk.UnimplementedGatewayPlugin
finished chan int
}

func (s *LambdaGateway) handle(ctx context.Context, event Event) (interface{}, error) {
Expand Down Expand Up @@ -222,31 +222,34 @@ func (s *LambdaGateway) handle(ctx context.Context, event Event) (interface{}, e

// Start the lambda gateway handler
func (s *LambdaGateway) Start(handler handler.TriggerHandler) error {
//s.finished = make(chan int)
s.handler = handler
// Here we want to begin polling lambda for incoming requests...
// Assuming that this is blocking
s.runtime(s.handle)
// Signal process to terminate if no more lambda requests to handle
syscall.SIGTERM.Signal()
// Unblock the 'Stop' function if it's waiting.
go func(){s.finished <- 1}()
return nil
}

func (s *LambdaGateway) Stop() error {
// XXX: This is a NO_OP Process, as this is a pull based system
// We don't need to stop listening to anything
fmt.Println("Shutting down lambda gateway")

fmt.Println("gateway 'Stop' called, waiting for lambda runtime to finish")
// Lambda can't be stopped, need to wait for it to finish
<- s.finished
return nil
}

func New() (sdk.GatewayService, error) {
return &LambdaGateway{
runtime: lambda.Start,
finished: make(chan int),
}, nil
}

func NewWithRuntime(runtime LambdaRuntimeHandler) (sdk.GatewayService, error) {
return &LambdaGateway{
runtime: runtime,
finished: make(chan int),
}, nil
}
10 changes: 6 additions & 4 deletions providers/aws/membrane.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ func main() {
log.Fatalf("There was an error initialising the membrane server: %v", err)
}

go (m.Start)()
// Wait for a terminate interrupt
<-term
m.Stop()
go(func(){
<-term
m.Stop()
})()

m.Start()
}
10 changes: 6 additions & 4 deletions providers/azure/membrane.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ func main() {
log.Fatalf("There was an error initialising the membrane server: %v", err)
}

go (m.Start)()
// Wait for a terminate interrupt
<-term
m.Stop()
go(func(){
<-term
m.Stop()
})()

m.Start()
}

0 comments on commit 55d8c56

Please sign in to comment.