Skip to content
This repository has been archived by the owner on Jan 2, 2025. It is now read-only.

Commit

Permalink
wip: we can push to any gateway providing the url
Browse files Browse the repository at this point in the history
  • Loading branch information
juligasa committed Jan 9, 2024
1 parent 1671e76 commit d4bc3f5
Show file tree
Hide file tree
Showing 9 changed files with 300 additions and 198 deletions.
4 changes: 2 additions & 2 deletions backend/daemon/api/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ type lazyGwClient struct {

// Connect connects to a remote gateway. Necessary here for the grpc server to add a site
// that needs to connect to the site under the hood.
func (ld *lazyGwClient) GatewayClient(ctx context.Context, testnet bool) (mttnet.GatewayClient, error) {
func (ld *lazyGwClient) GatewayClient(ctx context.Context, url string) (mttnet.GatewayClient, error) {
node, ok := ld.net.Get()
if !ok {
return nil, fmt.Errorf("p2p node is not yet initialized")
}
return node.GatewayClient(ctx, testnet)
return node.GatewayClient(ctx, url)
}

type lazyDiscoverer struct {
Expand Down
4 changes: 2 additions & 2 deletions backend/daemon/api/documents/v1alpha/documents.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Discoverer interface {
// GatewayClient used to connect to the gateway and push content.
type GatewayClient interface {
// GatewayClient used to connect to the gateway and push content.
GatewayClient(context.Context, bool) (mttnet.GatewayClient, error)
GatewayClient(context.Context, string) (mttnet.GatewayClient, error)
}

// Server implements DocumentsServer gRPC API.
Expand Down Expand Up @@ -521,7 +521,7 @@ func (api *Server) PushPublication(ctx context.Context, in *documents.PushPublic
return nil, status.Errorf(codes.NotFound, "couldn't find referenced materials for document %s: %v", entity.ID().String(), err)
}

gc, err := api.gwClient.GatewayClient(ctx, api.testnet)
gc, err := api.gwClient.GatewayClient(ctx, in.Url)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get site client: %v", err)
}
Expand Down
8 changes: 8 additions & 0 deletions backend/daemon/daemon_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
networking "mintter/backend/genproto/networking/v1alpha"
p2p "mintter/backend/genproto/p2p/v1alpha"
"mintter/backend/hyper"
"mintter/backend/ipfs"
"mintter/backend/mttnet"
"mintter/backend/pkg/must"
"mintter/backend/testutil"
Expand Down Expand Up @@ -130,8 +131,15 @@ func TestDaemonPushPublication(t *testing.T) {
_, err := alice.RPC.Documents.PushPublication(ctx, &documents.PushPublicationRequest{
DocumentId: pub.Document.Id,
Version: pub.Version,
Url: ipfs.TestGateway,
})
require.NoError(t, err)
_, err = alice.RPC.Documents.PushPublication(ctx, &documents.PushPublicationRequest{
DocumentId: pub.Document.Id,
Version: pub.Version,
Url: "https://gabo.es/",
})
require.Error(t, err)
}

func TestAPIGetRemotePublication(t *testing.T) {
Expand Down
358 changes: 184 additions & 174 deletions backend/genproto/documents/v1alpha/documents.pb.go

Large diffs are not rendered by default.

105 changes: 89 additions & 16 deletions backend/mttnet/mttnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import (
"io"
"mintter/backend/config"
"mintter/backend/core"
groups "mintter/backend/genproto/groups/v1alpha"
groups_proto "mintter/backend/genproto/groups/v1alpha"
p2p "mintter/backend/genproto/p2p/v1alpha"
"mintter/backend/hyper"
"mintter/backend/hyper/hypersql"
"mintter/backend/ipfs"
"mintter/backend/pkg/cleanup"
"mintter/backend/pkg/libp2px"
"mintter/backend/pkg/must"
"net/http"
"strings"
"time"

"crawshaw.io/sqlite"
Expand All @@ -39,6 +41,7 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
)

const protocolSupportKey = "mintter-support" // This is what we use as a key to protect the connection in ConnManager.
Expand All @@ -48,19 +51,19 @@ var userAgent = "mintter/<dev>"
// GatewayClient is the bridge to talk to the gateway.
type GatewayClient interface {
// PublishBlobs pushes given blobs to the gateway.
PublishBlobs(context.Context, *groups.PublishBlobsRequest, ...grpc.CallOption) (*groups.PublishBlobsResponse, error)
PublishBlobs(context.Context, *groups_proto.PublishBlobsRequest, ...grpc.CallOption) (*groups_proto.PublishBlobsResponse, error)
}

// WebsiteClient is the bridge to talk to remote sites.
type WebsiteClient interface {
// InitializeServer instruct the website that starts serving a given group.
InitializeServer(context.Context, *groups.InitializeServerRequest, ...grpc.CallOption) (*groups.InitializeServerResponse, error)
InitializeServer(context.Context, *groups_proto.InitializeServerRequest, ...grpc.CallOption) (*groups_proto.InitializeServerResponse, error)

// GetSiteInfo gets public site information, to be also found in /.well-known/hypermedia-site
GetSiteInfo(context.Context, *groups.GetSiteInfoRequest, ...grpc.CallOption) (*groups.PublicSiteInfo, error)
GetSiteInfo(context.Context, *groups_proto.GetSiteInfoRequest, ...grpc.CallOption) (*groups_proto.PublicSiteInfo, error)

// PublishBlobs pushes given blobs to the site.
PublishBlobs(context.Context, *groups.PublishBlobsRequest, ...grpc.CallOption) (*groups.PublishBlobsResponse, error)
PublishBlobs(context.Context, *groups_proto.PublishBlobsRequest, ...grpc.CallOption) (*groups_proto.PublishBlobsResponse, error)
}

// DefaultRelays bootstrap mintter-owned relays so they can reserve slots to do holepunch.
Expand Down Expand Up @@ -188,8 +191,8 @@ func New(cfg config.P2P, db *sqlitex.Pool, blobs *hyper.Storage, me core.Identit
p2p.RegisterP2PServer(n.grpc, rpc)

for _, extra := range extraServers {
if extraServer, ok := extra.(groups.WebsiteServer); ok {
groups.RegisterWebsiteServer(n.grpc, extraServer)
if extraServer, ok := extra.(groups_proto.WebsiteServer); ok {
groups_proto.RegisterWebsiteServer(n.grpc, extraServer)
break
}
}
Expand Down Expand Up @@ -248,32 +251,64 @@ func (n *Node) SiteClient(ctx context.Context, pid peer.ID) (WebsiteClient, erro
if err != nil {
return nil, err
}
return groups.NewWebsiteClient(conn), nil
return groups_proto.NewWebsiteClient(conn), nil
}

// GatewayClient opens a connection with the remote production gateway.
func (n *Node) GatewayClient(ctx context.Context, testnet bool) (GatewayClient, error) {
maStr := []string{ipfs.ProductionGateway}
if testnet {
func (n *Node) GatewayClient(ctx context.Context, url string) (cli GatewayClient, err error) {
var maStr []string
var ma []multiaddr.Multiaddr
if url == "" {
return nil, fmt.Errorf("URL cannot be empty")
}
if strings.Contains(ipfs.TestGateway, url) {
maStr = []string{ipfs.TestGateway}
ma, err = ipfs.ParseMultiaddrs(maStr)
} else if strings.Contains(ipfs.ProductionGateway, url) {
maStr = []string{ipfs.ProductionGateway}
ma, err = ipfs.ParseMultiaddrs(maStr)
} else {
noSpaces := strings.Replace(url, " ", "", -1)
maStr = strings.Split(noSpaces, ",")
ma, err = ipfs.ParseMultiaddrs(maStr)
if err != nil { // The user did not enter valid multiaddress, so we have to get the address by the well known url
siteURL := url
if url[len(url)-1] == '/' {
siteURL = url[0 : len(url)-1]
}
var info *groups_proto.PublicSiteInfo
info, err = getSiteInfoHTTP(ctx, nil, siteURL)
if err != nil {
return nil, fmt.Errorf("failed to get site info: %w", err)
}
maStr = []string{}
for _, addr := range info.PeerInfo.Addrs {
maStr = append(maStr, addr+"/p2p/"+info.PeerInfo.PeerId)
}
ma, err = ipfs.ParseMultiaddrs(maStr)
}
}
ma, err := ipfs.ParseMultiaddrs(maStr)
if err != nil {
return nil, fmt.Errorf("Could not parse gateway address: %w", err)
}
gwInfo, err := peer.AddrInfoFromP2pAddr(ma[0])
info, err := peer.AddrInfosFromP2pAddrs(ma...)
if err != nil {
return nil, fmt.Errorf("Could not decode gateway info: %w", err)
return nil, fmt.Errorf("Could not get ID from ma: %w", err)
}
if err := n.Connect(ctx, *gwInfo); err != nil {
gwInfo := peer.AddrInfo{
ID: info[0].ID,
Addrs: ma,
}

if err := n.Connect(ctx, gwInfo); err != nil {
return nil, err
}

conn, err := n.client.dialPeer(ctx, gwInfo.ID)
if err != nil {
return nil, err
}
return groups.NewWebsiteClient(conn), nil
return groups_proto.NewWebsiteClient(conn), err
}

// ArePrivateIPsAllowed check if private IPs (local) are allowed to connect.
Expand Down Expand Up @@ -582,3 +617,41 @@ func newProtocolInfo(prefix, version string) protocolInfo {
version: version,
}
}

func getSiteInfoHTTP(ctx context.Context, client *http.Client, siteURL string) (*groups_proto.PublicSiteInfo, error) {
if client == nil {
client = http.DefaultClient
}

if siteURL[len(siteURL)-1] == '/' {
return nil, fmt.Errorf("site URL must not have trailing slash: %s", siteURL)
}

requestURL := siteURL + "/.well-known/hypermedia-site"

req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, nil)
if err != nil {
return nil, fmt.Errorf("could not create request to well-known site: %w ", err)
}

res, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("could not contact to provided site [%s]: %w ", requestURL, err)
}
defer res.Body.Close()

data, err := io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}

if res.StatusCode < 200 || res.StatusCode > 299 {
return nil, fmt.Errorf("site info url %q not working, status code: %d, response body: %s", requestURL, res.StatusCode, data)
}
resp := &groups_proto.PublicSiteInfo{}
if err := protojson.Unmarshal(data, resp); err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON body: %w", err)
}

return resp, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,13 @@ export class PushPublicationRequest extends Message<PushPublicationRequest> {
*/
version = "";

/**
* Required. URL of the gateway to push to. Multiaddress format accepted (comma sepparated).
*
* @generated from field: string url = 3;
*/
url = "";

constructor(data?: PartialMessage<PushPublicationRequest>) {
super();
proto3.util.initPartial(data, this);
Expand All @@ -641,6 +648,7 @@ export class PushPublicationRequest extends Message<PushPublicationRequest> {
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "document_id", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 2, name: "version", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 3, name: "url", kind: "scalar", T: 9 /* ScalarType.STRING */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): PushPublicationRequest {
Expand Down
3 changes: 3 additions & 0 deletions proto/documents/v1alpha/documents.proto
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ message PushPublicationRequest {

// Required. Specific version of the published document.
string version = 2;

// Required. URL of the gateway to push to. Multiaddress format accepted (comma sepparated).
string url = 3;
}

// Request for listing publications.
Expand Down
4 changes: 2 additions & 2 deletions proto/documents/v1alpha/go.gensum
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
srcs: 037a2049aec6637f25868743ee0b4881
outs: bb66ee29416d037165c8f17fcf2c4a7a
srcs: 5bd376d5f9ef239e00af86529f025966
outs: 5318ca95674d2f78898520451350a1e9
4 changes: 2 additions & 2 deletions proto/documents/v1alpha/js.gensum
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
srcs: 037a2049aec6637f25868743ee0b4881
outs: 195aac1b5d309ec25854981679b32f8a
srcs: 5bd376d5f9ef239e00af86529f025966
outs: 19f4b69d2c1291f429154c1ad74bb9d0

0 comments on commit d4bc3f5

Please sign in to comment.