Skip to content

Commit

Permalink
PR fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
islamaliev committed Sep 23, 2024
1 parent 90866e1 commit a492a1d
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 46 deletions.
18 changes: 16 additions & 2 deletions internal/kms/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/event"
"github.com/sourcenetwork/defradb/internal/encryption"
"github.com/sourcenetwork/defradb/net"
pb "github.com/sourcenetwork/defradb/net/pb"
)

Expand Down Expand Up @@ -136,7 +135,7 @@ func (s *pubSubService) handleRequestFromPeer(peerID libpeer.ID, topic string, m
return nil, err

Check warning on line 135 in internal/kms/pubsub.go

View check run for this annotation

Codecov / codecov/patch

internal/kms/pubsub.go#L134-L135

Added lines #L134 - L135 were not covered by tests
}

ctx := grpcpeer.NewContext(s.ctx, net.NewGRPCPeer(peerID))
ctx := grpcpeer.NewContext(s.ctx, newGRPCPeer(peerID))
res, err := s.tryGenEncryptionKeyLocally(ctx, req)
if err != nil {
log.ErrorContextE(s.ctx, "failed attempt to get encryption key", err)
Expand Down Expand Up @@ -323,3 +322,18 @@ func encodeToBase64(data []byte) []byte {
base64.StdEncoding.Encode(encoded, data)
return encoded
}

func newGRPCPeer(peerID libpeer.ID) *grpcpeer.Peer {
return &grpcpeer.Peer{
Addr: addr{peerID},
}
}

// addr implements net.Addr and holds a libp2p peer ID.
type addr struct{ id libpeer.ID }

// Network returns the name of the network that this address belongs to (libp2p).
func (a addr) Network() string { return "libp2p" }

Check warning on line 336 in internal/kms/pubsub.go

View check run for this annotation

Codecov / codecov/patch

internal/kms/pubsub.go#L336

Added line #L336 was not covered by tests

// String returns the peer ID of this address in string form (B58-encoded).
func (a addr) String() string { return a.id.String() }

Check warning on line 339 in internal/kms/pubsub.go

View check run for this annotation

Codecov / codecov/patch

internal/kms/pubsub.go#L339

Added line #L339 was not covered by tests
10 changes: 1 addition & 9 deletions net/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
libpeer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"

"github.com/multiformats/go-multiaddr"
"github.com/sourcenetwork/corelog"
"google.golang.org/grpc"
grpcpeer "google.golang.org/grpc/peer"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/datastore"
Expand Down Expand Up @@ -229,12 +227,6 @@ func (p *Peer) Close() {
}
}

func NewGRPCPeer(peerID libpeer.ID) *grpcpeer.Peer {
return &grpcpeer.Peer{
Addr: addr{peerID},
}
}

// handleMessage loop manages the transition of messages
// from the internal broadcaster to the external pubsub network
func (p *Peer) handleMessageLoop() {
Expand Down Expand Up @@ -278,7 +270,7 @@ func (p *Peer) RegisterNewDocument(
schemaRoot string,
) error {
// register topic
err := p.server.addPubSubTopic(docID.String(), !p.server.hasPubSubTopic(schemaRoot))
err := p.server.addPubSubTopic(docID.String(), !p.server.hasPubSubTopic(schemaRoot), nil)
if err != nil {
log.ErrorE(
"Failed to create new pubsub topic",
Expand Down
50 changes: 15 additions & 35 deletions net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
// Once processed, subscribe to the DocID topic on the pubsub network unless we already
// subscribed to the collection.
if !s.hasPubSubTopic(string(req.Body.SchemaRoot)) {
err = s.addPubSubTopic(docID.String(), true)
err = s.addPubSubTopic(docID.String(), true, nil)
if err != nil {
return nil, err
}
Expand All @@ -172,7 +172,9 @@ func (s *server) GetHeadLog(
}

// addPubSubTopic subscribes to a topic on the pubsub network
func (s *server) addPubSubTopic(topic string, subscribe bool) error {
// A custom message handler can be provided to handle incoming messages. If not provided,
// the default message handler will be used.
func (s *server) addPubSubTopic(topic string, subscribe bool, handler rpc.MessageHandler) error {
if s.peer.ps == nil {
return nil
}
Expand Down Expand Up @@ -200,15 +202,23 @@ func (s *server) addPubSubTopic(topic string, subscribe bool) error {
return err
}

if handler == nil {
handler = s.pubSubMessageHandler
}

t.SetEventHandler(s.pubSubEventHandler)
t.SetMessageHandler(s.pubSubMessageHandler)
t.SetMessageHandler(handler)
s.topics[topic] = pubsubTopic{
Topic: t,
subscribed: subscribe,
}
return nil
}

func (s *server) AddPubSubTopic(topicName string, handler rpc.MessageHandler) error {
return s.addPubSubTopic(topicName, true, handler)
}

// hasPubSubTopic checks if we are subscribed to a topic.
func (s *server) hasPubSubTopic(topic string) bool {
s.mu.Lock()
Expand Down Expand Up @@ -269,7 +279,7 @@ func (s *server) publishLog(ctx context.Context, topic string, req *pb.PushLogRe
t, ok := s.topics[topic]
s.mu.Unlock()
if !ok {
err := s.addPubSubTopic(topic, false)
err := s.addPubSubTopic(topic, false, nil)
if err != nil {
return errors.Wrap(fmt.Sprintf("failed to created single use topic %s", topic), err)
}
Expand Down Expand Up @@ -347,7 +357,7 @@ func peerIDFromContext(ctx context.Context) (libpeer.ID, error) {

func (s *server) updatePubSubTopics(evt event.P2PTopic) {
for _, topic := range evt.ToAdd {
err := s.addPubSubTopic(topic, true)
err := s.addPubSubTopic(topic, true, nil)
if err != nil {
log.ErrorE("Failed to add pubsub topic.", err)
}
Expand Down Expand Up @@ -410,36 +420,6 @@ func (s *server) updateReplicators(evt event.Replicator) {
s.peer.bus.Publish(event.NewMessage(event.ReplicatorCompletedName, nil))
}

func (s *server) AddPubSubTopic(topicName string, handler rpc.MessageHandler) error {
if s.peer.ps == nil {
return nil
}

s.mu.Lock()
_, ok := s.topics[topicName]
s.mu.Unlock()
if ok {
return NewErrTopicAlreadyExist(topicName)
}

t, err := rpc.NewTopic(s.peer.ctx, s.peer.ps, s.peer.host.ID(), topicName, true)
if err != nil {
return err
}

t.SetEventHandler(s.pubSubEventHandler)
t.SetMessageHandler(handler)

s.mu.Lock()
defer s.mu.Unlock()

s.topics[topicName] = pubsubTopic{
Topic: t,
subscribed: true,
}
return nil
}

func (s *server) SendPubSubMessage(
ctx context.Context,
topic string,
Expand Down

0 comments on commit a492a1d

Please sign in to comment.