Skip to content

Commit

Permalink
Merge branch 'master' into UlianaAndrukhiv/6874-update-rest-block-res…
Browse files Browse the repository at this point in the history
…ponse
  • Loading branch information
Guitarheroua authored Jan 24, 2025
2 parents 28e9141 + 43e132e commit 2a86b08
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 121 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ docker-native-build-ghost-debug:
PHONY: docker-build-bootstrap
docker-build-bootstrap:
docker build -f cmd/Dockerfile --build-arg TARGET=./cmd/bootstrap --build-arg GOARCH=$(GOARCH) --build-arg VERSION=$(IMAGE_TAG) --build-arg CGO_FLAG=$(CRYPTO_FLAG) --target production \
--secret id=cadence_deploy_key,env=CADENCE_DEPLOY_KEY \
--label "git_commit=${COMMIT}" --label "git_tag=${IMAGE_TAG}" \
-t "$(CONTAINER_REGISTRY)/bootstrap:latest" \
-t "$(CONTAINER_REGISTRY)/bootstrap:$(IMAGE_TAG)" .
Expand All @@ -617,6 +618,7 @@ tool-bootstrap: docker-build-bootstrap
docker-build-bootstrap-transit:
docker build -f cmd/Dockerfile --build-arg TARGET=./cmd/bootstrap/transit --build-arg COMMIT=$(COMMIT) --build-arg VERSION=$(VERSION) --build-arg GOARCH=$(GOARCH) --build-arg CGO_FLAG=$(CRYPTO_FLAG) --no-cache \
--target production \
--secret id=cadence_deploy_key,env=CADENCE_DEPLOY_KEY \
-t "$(CONTAINER_REGISTRY)/bootstrap-transit:latest" \
-t "$(CONTAINER_REGISTRY)/bootstrap-transit:$(IMAGE_TAG)" .

Expand Down
3 changes: 3 additions & 0 deletions engine/access/rest/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ func (b *RouterBuilder) AddRestRoutes(
}

// AddLegacyWebsocketsRoutes adds WebSocket routes to the router.
//
// Deprecated: Use AddWebsocketsRoute instead, which allows managing multiple streams with
// a single endpoint.
func (b *RouterBuilder) AddLegacyWebsocketsRoutes(
stateStreamApi state_stream.API,
chain flow.Chain,
Expand Down
3 changes: 1 addition & 2 deletions engine/access/rest/websockets/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ func NewWebSocketHandler(
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
//TODO: change to accept topic instead of URL
logger := h.HttpHandler.Logger.With().Str("websocket_subscribe_url", r.URL.String()).Logger()
logger := h.HttpHandler.Logger.With().Str("component", "websocket-handler").Logger()

err := h.HttpHandler.VerifyRequest(w, r)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions engine/access/rest/websockets/legacy/websocket_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

// WebsocketController holds the necessary components and parameters for handling a WebSocket subscription.
// It manages the communication between the server and the WebSocket client for subscribing.
//
// Deprecated: Use websockets.Controller which allows managing multiple subscriptions with a single connection.
type WebsocketController struct {
logger zerolog.Logger
conn *websocket.Conn // the WebSocket connection for communication with the client
Expand Down
147 changes: 53 additions & 94 deletions engine/consensus/matching/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
sealing "github.com/onflow/flow-go/engine/consensus"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/channels"
Expand All @@ -27,7 +29,7 @@ const defaultIncorporatedBlockQueueCapacity = 10
// Engine is a wrapper struct for `Core` which implements consensus algorithm.
// Engine is responsible for handling incoming messages, queueing for processing, broadcasting proposals.
type Engine struct {
unit *engine.Unit
component.Component
log zerolog.Logger
me module.Local
core sealing.MatchingCore
Expand Down Expand Up @@ -69,7 +71,6 @@ func NewEngine(

e := &Engine{
log: log.With().Str("engine", "matching.Engine").Logger(),
unit: engine.NewUnit(),
me: me,
core: core,
state: state,
Expand All @@ -83,6 +84,12 @@ func NewEngine(
pendingIncorporatedBlocks: pendingIncorporatedBlocks,
}

e.Component = component.NewComponentManagerBuilder().
AddWorker(e.inboundEventsProcessingLoop).
AddWorker(e.finalizationProcessingLoop).
AddWorker(e.blockIncorporatedEventsProcessingLoop).
Build()

// register engine with the receipt provider
_, err = net.Register(channels.ReceiveReceipts, e)
if err != nil {
Expand All @@ -92,79 +99,34 @@ func NewEngine(
return e, nil
}

// Ready returns a ready channel that is closed once the engine has fully
// started. For consensus engine, this is true once the underlying consensus
// algorithm has started.
func (e *Engine) Ready() <-chan struct{} {
e.unit.Launch(e.inboundEventsProcessingLoop)
e.unit.Launch(e.finalizationProcessingLoop)
e.unit.Launch(e.blockIncorporatedEventsProcessingLoop)
return e.unit.Ready()
}

// Done returns a done channel that is closed once the engine has fully stopped.
// For the consensus engine, we wait for hotstuff to finish.
func (e *Engine) Done() <-chan struct{} {
return e.unit.Done()
}

// SubmitLocal submits an event originating on the local node.
func (e *Engine) SubmitLocal(event interface{}) {
err := e.ProcessLocal(event)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing event")
}
}

// Submit submits the given event from the node with the given origin ID
// for processing in a non-blocking manner. It returns instantly and logs
// a potential processing error internally when done.
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) {
err := e.Process(channel, originID, event)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing event")
}
}

// ProcessLocal processes an event originating on the local node.
func (e *Engine) ProcessLocal(event interface{}) error {
return e.process(e.me.NodeID(), event)
}

// Process processes the given event from the node with the given origin ID in
// a blocking manner. It returns the potential processing error when done.
// Process receives events from the network and checks their type,
// before enqueuing them to be processed by a worker in a non-blocking manner.
// No errors expected during normal operation (errors are logged instead).
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error {
err := e.process(originID, event)
if err != nil {
if engine.IsIncompatibleInputTypeError(err) {
e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel)
return nil
}
return fmt.Errorf("unexpected error while processing engine message: %w", err)
receipt, ok := event.(*flow.ExecutionReceipt)
if !ok {
e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel)
return nil
}
e.addReceiptToQueue(receipt)
return nil
}

// process events for the matching engine on the consensus node.
func (e *Engine) process(originID flow.Identifier, event interface{}) error {
receipt, ok := event.(*flow.ExecutionReceipt)
if !ok {
return fmt.Errorf("no matching processor for message of type %T from origin %x: %w", event, originID[:],
engine.IncompatibleInputTypeError)
}
// addReceiptToQueue adds an execution receipt to the queue of the matching engine, to be processed by a worker
func (e *Engine) addReceiptToQueue(receipt *flow.ExecutionReceipt) {
e.metrics.MessageReceived(metrics.EngineSealing, metrics.MessageExecutionReceipt)
e.pendingReceipts.Push(receipt)
e.inboundEventsNotifier.Notify()
return nil
}

// HandleReceipt ingests receipts from the Requester module.
// HandleReceipt ingests receipts from the Requester module, adding them to the queue.
func (e *Engine) HandleReceipt(originID flow.Identifier, receipt flow.Entity) {
e.log.Debug().Msg("received receipt from requester engine")
err := e.process(originID, receipt)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing event from requester module")
r, ok := receipt.(*flow.ExecutionReceipt)
if !ok {
e.log.Fatal().Err(engine.IncompatibleInputTypeError).Msg("internal error processing event from requester module")
}
e.addReceiptToQueue(r)
}

// OnFinalizedBlock implements the `OnFinalizedBlock` callback from the `hotstuff.FinalizationConsumer`
Expand All @@ -183,10 +145,10 @@ func (e *Engine) OnBlockIncorporated(incorporatedBlock *model.Block) {
}

// processIncorporatedBlock selects receipts that were included into incorporated block and submits them
// for further processing by matching core.
// to the matching core for further processing.
// Without the logic below, the sealing engine would produce IncorporatedResults
// only from receipts received directly from ENs. sealing Core would not know about
// Receipts that are incorporated by other nodes in their blocks blocks (but never
// Receipts that are incorporated by other nodes in their blocks (but never
// received directly from the EN).
// No errors expected during normal operations.
func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error {
Expand All @@ -205,61 +167,67 @@ func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error {
return nil
}

// finalizationProcessingLoop is a separate goroutine that performs processing of finalization events
func (e *Engine) finalizationProcessingLoop() {
// finalizationProcessingLoop contains the logic for processing of finalization events.
// This method is intended to be executed by a dedicated worker / goroutine.
func (e *Engine) finalizationProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
finalizationNotifier := e.finalizationEventsNotifier.Channel()
ready()
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return
case <-finalizationNotifier:
err := e.core.OnBlockFinalization()
if err != nil {
e.log.Fatal().Err(err).Msg("could not process last finalized event")
ctx.Throw(fmt.Errorf("could not process last finalized event: %w", err))
}
}
}
}

// blockIncorporatedEventsProcessingLoop is a separate goroutine for processing block incorporated events.
func (e *Engine) blockIncorporatedEventsProcessingLoop() {
// blockIncorporatedEventsProcessingLoop contains the logic for processing block incorporated events.
// This method is intended to be executed by a dedicated worker / goroutine.
func (e *Engine) blockIncorporatedEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
c := e.blockIncorporatedNotifier.Channel()

ready()
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return
case <-c:
err := e.processBlockIncorporatedEvents()
err := e.processBlockIncorporatedEvents(ctx)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing block incorporated queued message")
ctx.Throw(fmt.Errorf("internal error processing block incorporated queued message: %w", err))
}
}
}
}

func (e *Engine) inboundEventsProcessingLoop() {
// inboundEventsProcessingLoop contains the logic for processing execution receipts, received
// from the network via Process, from the Requester module via HandleReceipt, or from incorporated blocks.
// This method is intended to be executed by a dedicated worker / goroutine.
func (e *Engine) inboundEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
c := e.inboundEventsNotifier.Channel()

ready()
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return
case <-c:
err := e.processAvailableEvents()
err := e.processExecutionReceipts(ctx)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing queued message")
ctx.Throw(fmt.Errorf("internal error processing queued execution receipt: %w", err))
}
}
}
}

// processBlockIncorporatedEvents performs processing of block incorporated hot stuff events.
// No errors expected during normal operations.
func (e *Engine) processBlockIncorporatedEvents() error {
func (e *Engine) processBlockIncorporatedEvents(ctx irrecoverable.SignalerContext) error {
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return nil
default:
}
Expand All @@ -279,27 +247,18 @@ func (e *Engine) processBlockIncorporatedEvents() error {
}
}

// processAvailableEvents processes _all_ available events (untrusted messages
// processExecutionReceipts processes execution receipts
// from other nodes as well as internally trusted.
// No errors expected during normal operations.
func (e *Engine) processAvailableEvents() error {
func (e *Engine) processExecutionReceipts(ctx irrecoverable.SignalerContext) error {
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return nil
default:
}

msg, ok := e.pendingIncorporatedBlocks.Pop()
if ok {
err := e.processIncorporatedBlock(msg.(flow.Identifier))
if err != nil {
return fmt.Errorf("could not process incorporated block: %w", err)
}
continue
}

msg, ok = e.pendingReceipts.Pop()
msg, ok := e.pendingReceipts.Pop()
if ok {
err := e.core.ProcessReceipt(msg.(*flow.ExecutionReceipt))
if err != nil {
Expand Down
25 changes: 17 additions & 8 deletions engine/consensus/matching/engine_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package matching

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -10,9 +11,9 @@ import (
"github.com/stretchr/testify/suite"

"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/engine"
mockconsensus "github.com/onflow/flow-go/engine/consensus/mock"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
mockmodule "github.com/onflow/flow-go/module/mock"
"github.com/onflow/flow-go/network/channels"
Expand All @@ -36,6 +37,7 @@ type MatchingEngineSuite struct {

// Matching Engine
engine *Engine
cancel context.CancelFunc
}

func (s *MatchingEngineSuite) SetupTest() {
Expand All @@ -57,7 +59,17 @@ func (s *MatchingEngineSuite) SetupTest() {
s.engine, err = NewEngine(unittest.Logger(), net, me, metrics, metrics, s.state, s.receipts, s.index, s.core)
require.NoError(s.T(), err)

<-s.engine.Ready()
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(s.T(), context.Background())
s.cancel = cancel
s.engine.Start(ctx)
unittest.AssertClosesBefore(s.T(), s.engine.Ready(), 10*time.Millisecond)
}

func (s *MatchingEngineSuite) TearDownTest() {
if s.cancel != nil {
s.cancel()
unittest.AssertClosesBefore(s.T(), s.engine.Done(), 10*time.Millisecond)
}
}

// TestOnFinalizedBlock tests if finalized block gets processed when send through `Engine`.
Expand Down Expand Up @@ -135,15 +147,12 @@ func (s *MatchingEngineSuite) TestMultipleProcessingItems() {
s.core.AssertExpectations(s.T())
}

// TestProcessUnsupportedMessageType tests that Process and ProcessLocal correctly handle a case where invalid message type
// was submitted from network layer.
// TestProcessUnsupportedMessageType tests that Process correctly handles a case where invalid message type
// (byzantine message) was submitted from network layer.
func (s *MatchingEngineSuite) TestProcessUnsupportedMessageType() {
invalidEvent := uint64(42)
err := s.engine.Process("ch", unittest.IdentifierFixture(), invalidEvent)
// shouldn't result in error since byzantine inputs are expected
require.NoError(s.T(), err)
// in case of local processing error cannot be consumed since all inputs are trusted
err = s.engine.ProcessLocal(invalidEvent)
require.Error(s.T(), err)
require.True(s.T(), engine.IsIncompatibleInputTypeError(err))
// Local processing happens only via HandleReceipt, which will log.Fatal on invalid input
}
2 changes: 1 addition & 1 deletion fvm/environment/derived_data_invalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func TestMeterParamOverridesUpdated(t *testing.T) {

snapshotTree := snapshot.NewSnapshotTree(nil)

ctx := fvm.NewContext(fvm.WithChain(flow.Testnet.Chain()))
ctx := fvm.NewContext(fvm.WithChain(flow.Emulator.Chain()))

vm := fvm.NewVirtualMachine()
executionSnapshot, _, err := vm.Run(
Expand Down
Loading

0 comments on commit 2a86b08

Please sign in to comment.