From 0d940109f787fdb4e43c8d9e86abbab66d6e3d0f Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Thu, 12 Dec 2024 17:43:15 -0500 Subject: [PATCH] apply feedback --- acp/identity/identity.go | 20 ++++++ http/auth.go | 22 +------ http/auth_test.go | 6 +- internal/db/db.go | 2 +- internal/db/permission/check.go | 2 +- net/acp.go | 31 --------- net/dialer_test.go | 25 +++++--- net/grpc.go | 23 +++---- net/peer.go | 19 +++++- net/peer_test.go | 33 ++++++---- net/server.go | 82 +++++------------------- net/server_test.go | 42 +----------- node/acp.go | 40 ------------ node/node.go | 3 +- tests/integration/acp/p2p/create_test.go | 4 +- 15 files changed, 110 insertions(+), 244 deletions(-) delete mode 100644 net/acp.go diff --git a/acp/identity/identity.go b/acp/identity/identity.go index f764f17cc5..845a65577b 100644 --- a/acp/identity/identity.go +++ b/acp/identity/identity.go @@ -18,6 +18,7 @@ import ( "github.com/cyware/ssi-sdk/did/key" "github.com/decred/dcrd/dcrec/secp256k1/v4" "github.com/lestrrat-go/jwx/v2/jwa" + "github.com/lestrrat-go/jwx/v2/jws" "github.com/lestrrat-go/jwx/v2/jwt" "github.com/sourcenetwork/immutable" acptypes "github.com/sourcenetwork/sourcehub/x/acp/bearer_token" @@ -183,3 +184,22 @@ func (identity Identity) NewToken( return signedToken, nil } + +// VerifyAuthToken verifies that the jwt auth token is valid and that the signature +// matches the identity of the subject. +func VerifyAuthToken(ident Identity, audience string) error { + _, err := jwt.Parse([]byte(ident.BearerToken), jwt.WithVerify(false), jwt.WithAudience(audience)) + if err != nil { + return err + } + + _, err = jws.Verify( + []byte(ident.BearerToken), + jws.WithKey(BearerTokenSignatureScheme, ident.PublicKey.ToECDSA()), + ) + if err != nil { + return err + } + + return nil +} diff --git a/http/auth.go b/http/auth.go index 79f4262252..544b2fbe96 100644 --- a/http/auth.go +++ b/http/auth.go @@ -14,8 +14,6 @@ import ( "net/http" "strings" - "github.com/lestrrat-go/jwx/v2/jws" - "github.com/lestrrat-go/jwx/v2/jwt" "github.com/sourcenetwork/immutable" acpIdentity "github.com/sourcenetwork/defradb/acp/identity" @@ -30,24 +28,6 @@ const ( authSchemaPrefix = "Bearer " ) -// verifyAuthToken verifies that the jwt auth token is valid and that the signature -// matches the identity of the subject. -func verifyAuthToken(identity acpIdentity.Identity, audience string) error { - _, err := jwt.Parse([]byte(identity.BearerToken), jwt.WithVerify(false), jwt.WithAudience(audience)) - if err != nil { - return err - } - - _, err = jws.Verify( - []byte(identity.BearerToken), - jws.WithKey(acpIdentity.BearerTokenSignatureScheme, identity.PublicKey.ToECDSA()), - ) - if err != nil { - return err - } - return nil -} - // AuthMiddleware authenticates an actor and sets their identity for all subsequent actions. func AuthMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { @@ -63,7 +43,7 @@ func AuthMiddleware(next http.Handler) http.Handler { return } - err = verifyAuthToken(ident, strings.ToLower(req.Host)) + err = acpIdentity.VerifyAuthToken(ident, strings.ToLower(req.Host)) if err != nil { http.Error(rw, "forbidden", http.StatusForbidden) return diff --git a/http/auth_test.go b/http/auth_test.go index 365ed98c89..0fa46e5c81 100644 --- a/http/auth_test.go +++ b/http/auth_test.go @@ -34,7 +34,7 @@ func TestVerifyAuthToken(t *testing.T) { err = identity.UpdateToken(time.Hour, immutable.Some(audience), immutable.None[string]()) require.NoError(t, err) - err = verifyAuthToken(identity, audience) + err = acpIdentity.VerifyAuthToken(identity, audience) require.NoError(t, err) } @@ -48,7 +48,7 @@ func TestVerifyAuthTokenErrorsWithNonMatchingAudience(t *testing.T) { err = identity.UpdateToken(time.Hour, immutable.Some("valid"), immutable.None[string]()) require.NoError(t, err) - err = verifyAuthToken(identity, "invalid") + err = acpIdentity.VerifyAuthToken(identity, "invalid") assert.Error(t, err) } @@ -65,6 +65,6 @@ func TestVerifyAuthTokenErrorsWithExpired(t *testing.T) { err = identity.UpdateToken(-time.Hour, immutable.Some(audience), immutable.None[string]()) require.NoError(t, err) - err = verifyAuthToken(identity, "123abc") + err = acpIdentity.VerifyAuthToken(identity, "123abc") assert.Error(t, err) } diff --git a/internal/db/db.go b/internal/db/db.go index 5ad246092c..13f29e2ad9 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -346,7 +346,7 @@ func (db *db) GetNodeIdentity(_ context.Context) (immutable.Option[identity.Publ return immutable.None[identity.PublicRawIdentity](), nil } -func (db *db) GetIdentityToken(_ context.Context, audience immutable.Option[string]) ([]byte, error) { +func (db *db) GetNodeIdentityToken(_ context.Context, audience immutable.Option[string]) ([]byte, error) { if db.nodeIdentity.HasValue() { return db.nodeIdentity.Value().NewToken(time.Hour*24, audience, immutable.None[string]()) } diff --git a/internal/db/permission/check.go b/internal/db/permission/check.go index 550f9b1ef0..197bc2a302 100644 --- a/internal/db/permission/check.go +++ b/internal/db/permission/check.go @@ -69,7 +69,7 @@ func CheckAccessOfDocOnCollectionWithACP( // Unrestricted Access to document if: // - (2) is false. // - Document is public (unregistered), whether signatured request or not doesn't matter. -func CheckAccessDocAccessWithDID( +func CheckDocAccessWithDID( ctx context.Context, did string, acpSystem acp.ACP, diff --git a/net/acp.go b/net/acp.go deleted file mode 100644 index d7155b7b11..0000000000 --- a/net/acp.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2024 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package net - -import ( - "context" - - "github.com/sourcenetwork/immutable" - - "github.com/sourcenetwork/defradb/acp" - "github.com/sourcenetwork/defradb/acp/identity" - "github.com/sourcenetwork/defradb/client" -) - -type ACP interface { - acp.ACP - // GetCollections returns the list of collections according to the given options. - GetCollections(ctx context.Context, opts client.CollectionFetchOptions) ([]client.Collection, error) - // GetIndentityToken returns an identity token for the given audience. - GetIdentityToken(ctx context.Context, audience immutable.Option[string]) ([]byte, error) - // GetNodeIdentity returns the node's public raw identity. - GetNodeIdentity(ctx context.Context) (immutable.Option[identity.PublicRawIdentity], error) -} diff --git a/net/dialer_test.go b/net/dialer_test.go index 0346ac330b..81347990d3 100644 --- a/net/dialer_test.go +++ b/net/dialer_test.go @@ -14,6 +14,7 @@ import ( "context" "testing" + "github.com/sourcenetwork/defradb/acp" "github.com/sourcenetwork/immutable" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -30,7 +31,8 @@ func TestDial_WithConnectedPeer_NoError(t *testing.T) { db1.Blockstore(), db1.Encstore(), db1.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + nil, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) @@ -38,9 +40,10 @@ func TestDial_WithConnectedPeer_NoError(t *testing.T) { n2, err := NewPeer( ctx, db2.Blockstore(), - db1.Encstore(), + db2.Encstore(), db2.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + nil, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) @@ -64,7 +67,8 @@ func TestDial_WithConnectedPeerAndSecondConnection_NoError(t *testing.T) { db1.Blockstore(), db1.Encstore(), db1.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + nil, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) @@ -72,9 +76,10 @@ func TestDial_WithConnectedPeerAndSecondConnection_NoError(t *testing.T) { n2, err := NewPeer( ctx, db2.Blockstore(), - db1.Encstore(), + db2.Encstore(), db2.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + nil, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) @@ -101,7 +106,8 @@ func TestDial_WithConnectedPeerAndSecondConnectionWithConnectionShutdown_Closing db1.Blockstore(), db1.Encstore(), db1.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + nil, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) @@ -109,9 +115,10 @@ func TestDial_WithConnectedPeerAndSecondConnectionWithConnectionShutdown_Closing n2, err := NewPeer( ctx, db2.Blockstore(), - db1.Encstore(), + db2.Encstore(), db2.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + nil, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) diff --git a/net/grpc.go b/net/grpc.go index f11faa9966..4f7bd0558a 100644 --- a/net/grpc.go +++ b/net/grpc.go @@ -65,17 +65,10 @@ type getIdentityReply struct { } type serviceServer interface { - // GetDocGraph from this peer. - GetDocGraph(context.Context, *getDocGraphRequest) (*getDocGraphReply, error) - // PushDocGraph to this peer. - PushDocGraph(context.Context, *pushDocGraphRequest) (*pushDocGraphReply, error) - // GetLog from this peer. - GetLog(context.Context, *getLogRequest) (*getLogReply, error) - // PushLog to this peer. - PushLog(context.Context, *pushLogRequest) (*pushLogReply, error) - // GetHeadLog from this peer - GetHeadLog(context.Context, *getHeadLogRequest) (*getHeadLogReply, error) - GetIdentity(context.Context, *getIdentityRequest) (*getIdentityReply, error) + // pushLogHandler handles a push log request to sync blocks. + pushLogHandler(context.Context, *pushLogRequest) (*pushLogReply, error) + // getIdentityHandler handles an indentity request and returns the local node's identity. + getIdentityHandler(context.Context, *getIdentityRequest) (*getIdentityReply, error) } func getIdentityHandler( @@ -89,14 +82,14 @@ func getIdentityHandler( return nil, err } if interceptor == nil { - return srv.(serviceServer).GetIdentity(ctx, in) + return srv.(serviceServer).getIdentityHandler(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: serviceGetIdentityName, } handler := func(ctx context.Context, req any) (any, error) { - return srv.(serviceServer).GetIdentity(ctx, req.(*getIdentityRequest)) + return srv.(serviceServer).getIdentityHandler(ctx, req.(*getIdentityRequest)) } return interceptor(ctx, in, info, handler) } @@ -112,14 +105,14 @@ func pushLogHandler( return nil, err } if interceptor == nil { - return srv.(serviceServer).PushLog(ctx, in) + return srv.(serviceServer).pushLogHandler(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: servicePushLogName, } handler := func(ctx context.Context, req any) (any, error) { - return srv.(serviceServer).PushLog(ctx, req.(*pushLogRequest)) + return srv.(serviceServer).pushLogHandler(ctx, req.(*pushLogRequest)) } return interceptor(ctx, in, info, handler) } diff --git a/net/peer.go b/net/peer.go index d9186a10ed..247d6c72ff 100644 --- a/net/peer.go +++ b/net/peer.go @@ -33,6 +33,8 @@ import ( "github.com/sourcenetwork/immutable" "google.golang.org/grpc" + "github.com/sourcenetwork/defradb/acp" + "github.com/sourcenetwork/defradb/acp/identity" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/datastore" "github.com/sourcenetwork/defradb/errors" @@ -40,6 +42,16 @@ import ( corenet "github.com/sourcenetwork/defradb/internal/core/net" ) +// DB hold the database related methods that are required by Peer. +type DB interface { + // GetCollections returns the list of collections according to the given options. + GetCollections(ctx context.Context, opts client.CollectionFetchOptions) ([]client.Collection, error) + // GetNodeIndentityToken returns an identity token for the given audience. + GetNodeIdentityToken(ctx context.Context, audience immutable.Option[string]) ([]byte, error) + // GetNodeIdentity returns the node's public raw identity. + GetNodeIdentity(ctx context.Context) (immutable.Option[identity.PublicRawIdentity], error) +} + // Peer is a DefraDB Peer node which exposes all the LibP2P host/peer functionality // to the underlying DefraDB instance. type Peer struct { @@ -62,7 +74,8 @@ type Peer struct { // peer DAG service bserv blockservice.BlockService - acp immutable.Option[ACP] + acp immutable.Option[acp.ACP] + db DB bootCloser io.Closer } @@ -73,7 +86,8 @@ func NewPeer( blockstore datastore.Blockstore, encstore datastore.Blockstore, bus *event.Bus, - acp immutable.Option[ACP], + acp immutable.Option[acp.ACP], + db DB, opts ...NodeOpt, ) (p *Peer, err error) { ctx, cancel := context.WithCancel(ctx) @@ -124,6 +138,7 @@ func NewPeer( cancel: cancel, bus: bus, acp: acp, + db: db, p2pRPC: grpc.NewServer(options.GRPCServerOptions...), } diff --git a/net/peer_test.go b/net/peer_test.go index e2150a6c52..529414716c 100644 --- a/net/peer_test.go +++ b/net/peer_test.go @@ -82,7 +82,8 @@ func newTestPeer(ctx context.Context, t *testing.T) (client.DB, *Peer) { db.Blockstore(), db.Encstore(), db.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + db, WithListenAddresses(randomMultiaddr), ) require.NoError(t, err) @@ -96,14 +97,14 @@ func TestNewPeer_NoError(t *testing.T) { db, err := db.NewDB(ctx, store, acp.NoACP, nil) require.NoError(t, err) defer db.Close() - p, err := NewPeer(ctx, db.Blockstore(), db.Encstore(), db.Events(), immutable.None[ACP]()) + p, err := NewPeer(ctx, db.Blockstore(), db.Encstore(), db.Events(), immutable.None[acp.ACP](), db) require.NoError(t, err) p.Close() } func TestNewPeer_NoDB_NilDBError(t *testing.T) { ctx := context.Background() - _, err := NewPeer(ctx, nil, nil, nil, immutable.None[ACP]()) + _, err := NewPeer(ctx, nil, nil, nil, immutable.None[acp.ACP](), nil) require.ErrorIs(t, err, ErrNilDB) } @@ -124,7 +125,8 @@ func TestStart_WithKnownPeer_NoError(t *testing.T) { db1.Blockstore(), db1.Encstore(), db1.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + db1, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) require.NoError(t, err) @@ -132,9 +134,10 @@ func TestStart_WithKnownPeer_NoError(t *testing.T) { n2, err := NewPeer( ctx, db2.Blockstore(), - db1.Encstore(), + db2.Encstore(), db2.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + db2, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) require.NoError(t, err) @@ -274,7 +277,8 @@ func TestNewPeer_WithEnableRelay_NoError(t *testing.T) { db.Blockstore(), db.Encstore(), db.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + db, WithEnableRelay(true), ) require.NoError(t, err) @@ -293,7 +297,8 @@ func TestNewPeer_NoPubSub_NoError(t *testing.T) { db.Blockstore(), db.Encstore(), db.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + db, WithEnablePubSub(false), ) require.NoError(t, err) @@ -313,7 +318,8 @@ func TestNewPeer_WithEnablePubSub_NoError(t *testing.T) { db.Blockstore(), db.Encstore(), db.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + db, WithEnablePubSub(true), ) @@ -334,7 +340,8 @@ func TestNodeClose_NoError(t *testing.T) { db.Blockstore(), db.Encstore(), db.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + db, ) require.NoError(t, err) n.Close() @@ -352,7 +359,8 @@ func TestListenAddrs_WithListenAddresses_NoError(t *testing.T) { db.Blockstore(), db.Encstore(), db.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + db, WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) require.NoError(t, err) @@ -372,7 +380,8 @@ func TestPeer_WithBootstrapPeers_NoError(t *testing.T) { db.Blockstore(), db.Encstore(), db.Events(), - immutable.None[ACP](), + immutable.None[acp.ACP](), + db, WithBootstrapPeers("/ip4/127.0.0.1/tcp/6666/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"), ) require.NoError(t, err) diff --git a/net/server.go b/net/server.go index 9377b09227..1c4cebb0b7 100644 --- a/net/server.go +++ b/net/server.go @@ -19,8 +19,6 @@ import ( "github.com/fxamacker/cbor/v2" cid "github.com/ipfs/go-cid" - "github.com/lestrrat-go/jwx/v2/jws" - "github.com/lestrrat-go/jwx/v2/jwt" "github.com/libp2p/go-libp2p/core/peer" libpeer "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" @@ -90,34 +88,13 @@ func newServer(p *Peer, opts ...grpc.DialOption) (*server, error) { return s, nil } -// GetDocGraph receives a get graph request -func (s *server) GetDocGraph( - ctx context.Context, - req *getDocGraphRequest, -) (*getDocGraphReply, error) { - return nil, nil +// pushLogHandler receives a push log request from the grpc server (replicator) +func (s *server) pushLogHandler(ctx context.Context, req *pushLogRequest) (*pushLogReply, error) { + return s.processPushlog(ctx, req, true) } -// PushDocGraph receives a push graph request -func (s *server) PushDocGraph( - ctx context.Context, - req *pushDocGraphRequest, -) (*pushDocGraphReply, error) { - return nil, nil -} - -// GetLog receives a get log request -func (s *server) GetLog(ctx context.Context, req *getLogRequest) (*getLogReply, error) { - return nil, nil -} - -// PushLog receives a push log request from the grpc server (replicator) -func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogReply, error) { - return s.handlePushLog(ctx, req, true) -} - -// handlePushLog handles a push log request -func (s *server) handlePushLog( +// processPushlog processes a push log request +func (s *server) processPushlog( ctx context.Context, req *pushLogRequest, isReplicator bool, @@ -194,24 +171,16 @@ func (s *server) handlePushLog( return &pushLogReply{}, nil } -// GetHeadLog receives a get head log request -func (s *server) GetHeadLog( - ctx context.Context, - req *getHeadLogRequest, -) (*getHeadLogReply, error) { - return nil, nil -} - -// GetIdentity receives a get identity request and returns the identity token +// getIdentityHandler receives a get identity request and returns the identity token // with the requesting peer as the audience. -func (s *server) GetIdentity( +func (s *server) getIdentityHandler( ctx context.Context, req *getIdentityRequest, ) (*getIdentityReply, error) { if !s.peer.acp.HasValue() { return &getIdentityReply{}, nil } - token, err := s.peer.acp.Value().GetIdentityToken(ctx, immutable.Some(req.PeerID)) + token, err := s.peer.db.GetNodeIdentityToken(ctx, immutable.Some(req.PeerID)) if err != nil { return nil, err } @@ -373,7 +342,7 @@ func (s *server) pubSubMessageHandler(from libpeer.ID, topic string, msg []byte) ctx := grpcpeer.NewContext(s.peer.ctx, &grpcpeer.Peer{ Addr: addr{from}, }) - if _, err := s.handlePushLog(ctx, req, false); err != nil { + if _, err := s.processPushlog(ctx, req, false); err != nil { return nil, errors.Wrap(fmt.Sprintf("Failed pushing log for doc %s", topic), err) } return nil, nil @@ -494,7 +463,9 @@ func (s *server) SendPubSubMessage( return t.Publish(ctx, data) } -// hasAccess checks if the requesting peer has access to the given cid. +// hasAccess checks if the requesting peer has access to the given ci. +// +// This is used as a filter in bitswap to determine if we should send the block to the requesting peer. func (s *server) hasAccess(p peer.ID, c cid.Cid) bool { if !s.peer.acp.HasValue() { return true @@ -511,7 +482,7 @@ func (s *server) hasAccess(p peer.ID, c cid.Cid) bool { return false } - cols, err := s.peer.acp.Value().GetCollections( + cols, err := s.peer.db.GetCollections( s.peer.ctx, client.CollectionFetchOptions{ SchemaVersionID: immutable.Some(block.Delta.GetSchemaVersionID()), @@ -549,7 +520,7 @@ func (s *server) hasAccess(p peer.ID, c cid.Cid) bool { log.ErrorE("Failed to parse identity token", err) return immutable.None[identity.Identity]() } - err = verifyAuthToken(ident, s.peer.PeerID().String()) + err = identity.VerifyAuthToken(ident, s.peer.PeerID().String()) if err != nil { log.ErrorE("Failed to verify auth token", err) return immutable.None[identity.Identity]() @@ -585,7 +556,7 @@ func (s *server) trySelfHasAccess(block *coreblock.Block) bool { return true } - cols, err := s.peer.acp.Value().GetCollections( + cols, err := s.peer.db.GetCollections( s.peer.ctx, client.CollectionFetchOptions{ SchemaVersionID: immutable.Some(block.Delta.GetSchemaVersionID()), @@ -599,7 +570,7 @@ func (s *server) trySelfHasAccess(block *coreblock.Block) bool { log.Info("No collections found", corelog.Any("Schema Version ID", block.Delta.GetSchemaVersionID())) return true } - ident, err := s.peer.acp.Value().GetNodeIdentity(s.peer.ctx) + ident, err := s.peer.db.GetNodeIdentity(s.peer.ctx) if err != nil { log.ErrorE("Failed to get node identity", err) return true @@ -609,7 +580,7 @@ func (s *server) trySelfHasAccess(block *coreblock.Block) bool { return true } - peerHasAccess, err := permission.CheckAccessDocAccessWithDID( + peerHasAccess, err := permission.CheckDocAccessWithDID( s.peer.ctx, ident.Value().DID, s.peer.acp.Value(), @@ -624,22 +595,3 @@ func (s *server) trySelfHasAccess(block *coreblock.Block) bool { return peerHasAccess } - -// verifyAuthToken verifies that the jwt auth token is valid and that the signature -// matches the identity of the subject. -func verifyAuthToken(ident identity.Identity, audience string) error { - _, err := jwt.Parse([]byte(ident.BearerToken), jwt.WithVerify(false), jwt.WithAudience(audience)) - if err != nil { - return err - } - - _, err = jws.Verify( - []byte(ident.BearerToken), - jws.WithKey(identity.BearerTokenSignatureScheme, ident.PublicKey.ToECDSA()), - ) - if err != nil { - return err - } - - return nil -} diff --git a/net/server_test.go b/net/server_test.go index a29952d2b8..49b8ae1e91 100644 --- a/net/server_test.go +++ b/net/server_test.go @@ -34,46 +34,6 @@ func TestNewServerSimple(t *testing.T) { require.NoError(t, err) } -func TestGetDocGraph(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - r, err := p.server.GetDocGraph(ctx, &getDocGraphRequest{}) - require.Nil(t, r) - require.Nil(t, err) -} - -func TestPushDocGraph(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - r, err := p.server.PushDocGraph(ctx, &pushDocGraphRequest{}) - require.Nil(t, r) - require.Nil(t, err) -} - -func TestGetLog(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - r, err := p.server.GetLog(ctx, &getLogRequest{}) - require.Nil(t, r) - require.Nil(t, err) -} - -func TestGetHeadLog(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - r, err := p.server.GetHeadLog(ctx, &getHeadLogRequest{}) - require.Nil(t, r) - require.Nil(t, err) -} - func getHead(ctx context.Context, db client.DB, docID client.DocID) (cid.Cid, error) { prefix := keys.DataStoreKeyFromDocID(docID).ToHeadStoreKey().WithFieldID(core.COMPOSITE_NAMESPACE).ToString() results, err := db.Headstore().Query(ctx, query.Query{Prefix: prefix}) @@ -126,7 +86,7 @@ func TestPushLog(t *testing.T) { b, err := db.Blockstore().AsIPLDStorage().Get(ctx, headCID.KeyString()) require.NoError(t, err) - _, err = p.server.PushLog(ctx, &pushLogRequest{ + _, err = p.server.pushLogHandler(ctx, &pushLogRequest{ DocID: doc.ID().String(), CID: headCID.Bytes(), SchemaRoot: col.SchemaRoot(), diff --git a/node/acp.go b/node/acp.go index d1365bfa22..4092aa3532 100644 --- a/node/acp.go +++ b/node/acp.go @@ -17,9 +17,6 @@ import ( "github.com/sourcenetwork/immutable" "github.com/sourcenetwork/defradb/acp" - "github.com/sourcenetwork/defradb/acp/identity" - "github.com/sourcenetwork/defradb/client" - "github.com/sourcenetwork/defradb/net" ) type ACPType string @@ -152,40 +149,3 @@ func NewACP(ctx context.Context, opts ...ACPOpt) (immutable.Option[acp.ACP], err return immutable.Some[acp.ACP](acpLocal), nil } } - -// acpDB is an interface for ACP related DB operations. -type acpDB interface { - GetCollections(ctx context.Context, options client.CollectionFetchOptions) ([]client.Collection, error) - GetIdentityToken(ctx context.Context, audience immutable.Option[string]) ([]byte, error) - GetNodeIdentity(ctx context.Context) (immutable.Option[identity.PublicRawIdentity], error) -} - -type netACP struct { - acp.ACP - db acpDB -} - -var _ net.ACP = (*netACP)(nil) - -// NewNetACP returns a new net.ACP instance. -func NewNetACP(a immutable.Option[acp.ACP], db acpDB) immutable.Option[net.ACP] { - if !a.HasValue() { - return immutable.None[net.ACP]() - } - return immutable.Some[net.ACP](&netACP{a.Value(), db}) -} - -func (n *netACP) GetCollections( - ctx context.Context, - options client.CollectionFetchOptions, -) ([]client.Collection, error) { - return n.db.GetCollections(ctx, options) -} - -func (n *netACP) GetIdentityToken(ctx context.Context, audience immutable.Option[string]) ([]byte, error) { - return n.db.GetIdentityToken(ctx, audience) -} - -func (n *netACP) GetNodeIdentity(ctx context.Context) (immutable.Option[identity.PublicRawIdentity], error) { - return n.db.GetNodeIdentity(ctx) -} diff --git a/node/node.go b/node/node.go index 15be7cf8fd..a95a9538fe 100644 --- a/node/node.go +++ b/node/node.go @@ -160,7 +160,8 @@ func (n *Node) Start(ctx context.Context) error { coreDB.Blockstore(), coreDB.Encstore(), coreDB.Events(), - NewNetACP(acp, coreDB), + acp, + coreDB, n.netOpts..., ) if err != nil { diff --git a/tests/integration/acp/p2p/create_test.go b/tests/integration/acp/p2p/create_test.go index ab248fff9b..b4561962c3 100644 --- a/tests/integration/acp/p2p/create_test.go +++ b/tests/integration/acp/p2p/create_test.go @@ -131,11 +131,11 @@ func TestACP_P2PCreatePrivateDocumentsOnDifferentNodes_SourceHubACP(t *testing.T testUtils.ExecuteTestCase(t, test) } -func TestACP_P2PCreatePrivateDocumentsOnDifferentNodes_SourceHubACP1(t *testing.T) { +func TestACP_P2PCreatePrivateDocumentAndSyncAfterAddingRelationship_SourceHubACP(t *testing.T) { expectedPolicyID := "fc56b7509c20ac8ce682b3b9b4fdaad868a9c70dda6ec16720298be64f16e9a4" test := testUtils.TestCase{ - Description: "Test acp, p2p create private documents on different nodes, with source-hub", + Description: "Test acp, p2p create a private documents and sync after adding actor relationship, with source-hub", SupportedACPTypes: immutable.Some( []testUtils.ACPType{ testUtils.SourceHubACPType,