Skip to content

Commit

Permalink
Add minor PR changes
Browse files Browse the repository at this point in the history
  • Loading branch information
islamaliev committed Aug 13, 2024
1 parent c70571d commit 21c98e7
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 25 deletions.
1 change: 1 addition & 0 deletions internal/db/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ const (
errReplicatorCollections string = "failed to get collections for replicator"
errReplicatorNotFound string = "replicator not found"
errCanNotEncryptBuiltinField string = "can not encrypt build-in field"
errFailedToHandleEncKeysReceivedEvent string = "failed to handle encryption-keys-received event"
)

var (
Expand Down
37 changes: 16 additions & 21 deletions internal/db/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,30 +82,25 @@ func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) {

case encryption.KeyRetrievedEvent:
go func() {
ctx = encryption.ContextWithStore(ctx, db.Encstore())
for encStoreKey, encKey := range evt.Keys {
err := encryption.SaveKey(ctx, encStoreKey, encKey)

if err != nil {
log.ErrorContextE(
ctx,
"Failed to save doc encryption key",
err,
corelog.Any("Event", evt))
}
}

err := db.mergeEncryptedBlocks(ctx, evt)

if err != nil {
log.ErrorContextE(
ctx,
"Failed to merge encrypted block",
err,
corelog.Any("Event", evt))
if err := db.handleEncryptionKeysRetrievedEvent(ctx, evt); err != nil {
log.ErrorContextE(ctx, errFailedToHandleEncKeysReceivedEvent, err, corelog.Any("Event", evt))
}
}()
}
}
}
}

// handleEncryptionKeysRetrievedEvent handles the event when requested encryption keys are retrieved from other peers.
func (db *db) handleEncryptionKeysRetrievedEvent(ctx context.Context, evt encryption.KeyRetrievedEvent) error {
ctx = encryption.ContextWithStore(ctx, db.Encstore())
for encStoreKey, encKey := range evt.Keys {
err := encryption.SaveKey(ctx, encStoreKey, encKey)

if err != nil {
return err
}
}

return db.mergeEncryptedBlocks(ctx, evt)
}
4 changes: 3 additions & 1 deletion net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func newServer(p *Peer, opts ...grpc.DialOption) (*server, error) {
return s, nil
}

func (s *server) extractSessionAndRemoveOldOnes(id string) *session {
// extractSessionAndRemoveOld extracts a session with the given id from the server's session list
// and removes any old sessions by comparing their timestamps.
func (s *server) extractSessionAndRemoveOld(id string) *session {
var result *session
swapLast := func(i int) {
s.sessions[i] = s.sessions[len(s.sessions)-1]
Expand Down
2 changes: 1 addition & 1 deletion net/server_encryption_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (s *server) handleFetchEncryptionKeyResponse(resp rpc.Response, req *pb.Fet
return
}

session := s.extractSessionAndRemoveOldOnes(string(keyResp.ReqEphemeralPublicKey))
session := s.extractSessionAndRemoveOld(string(keyResp.ReqEphemeralPublicKey))
if session == nil {
log.ErrorContext(s.peer.ctx, "Failed to find session for ephemeral public key")
return
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/assert_stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (
"strings"
)

// assertStack keeps track of the current assertion path.
// GraphQL response can be traversed by a key of a map and/or an index of an array.
// So whenever we have a mismatch in a large response, we can use this stack to find the exact path.
// Example output: "commits[2].links[1].cid"
type assertStack struct {
stack []string
isMap []bool
Expand Down
8 changes: 6 additions & 2 deletions tests/integration/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,13 @@ func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} {
return expect
}

func waitForKeyRetrievedEvent(s *state, nodeIDs []int) {
// waitForKeyRetrievedEvent waits for nodes to receive a key retrieved event.
// If targetNodeIDs is empty, all nodes will be waiting on the event sync.
// Otherwise, only the nodes with the specified IDs will be checked.
func waitForKeyRetrievedEvent(s *state, targetNodeIDs []int) {
for nodeID := 0; nodeID < len(s.nodes); nodeID++ {
if len(nodeIDs) > 0 && !slices.Contains(nodeIDs, nodeID) {
// if we have target nodes and the current node is not in the list, skip it
if len(targetNodeIDs) > 0 && !slices.Contains(targetNodeIDs, nodeID) {
continue
}

Expand Down

0 comments on commit 21c98e7

Please sign in to comment.