Skip to content

Commit

Permalink
feat: http publisher without fully setting up server (#382)
Browse files Browse the repository at this point in the history
* feat: http publisher without fully setting up server

Ref: ipni/go-libipni#56
---------

Co-authored-by: gammazero <[email protected]>
  • Loading branch information
rvagg and gammazero authored Jul 26, 2023
1 parent 82e27f5 commit 2a822ff
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 31 deletions.
83 changes: 56 additions & 27 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"sync"

Expand All @@ -27,8 +28,6 @@ import (
"github.com/ipni/go-libipni/metadata"
provider "github.com/ipni/index-provider"
"github.com/ipni/index-provider/engine/chunker"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
Expand Down Expand Up @@ -119,7 +118,7 @@ func (e *Engine) Start(ctx context.Context) error {

e.publisher, err = e.newPublisher()
if err != nil {
log.Errorw("Failed to instantiate dagsync publisher", "err", err, "kind", e.pubKind)
log.Errorw("Failed to instantiate publisher", "err", err, "kind", e.pubKind)
return err
}

Expand All @@ -134,7 +133,7 @@ func (e *Engine) Start(ctx context.Context) error {
}

// If publisher created, then create announcement senders.
e.senders, err = createSenders(e.announceURLs, e.h, e.pubTopicName, e.pubExtraGossipData, e.pubTopic)
e.senders, err = e.createSenders()
if err != nil {
return err
}
Expand All @@ -149,7 +148,13 @@ func (e *Engine) newPublisher() (dagsync.Publisher, error) {
log.Info("Remote announcements disabled; all advertisements will only be stored locally.")
return nil, nil
case HttpPublisher:
httpPub, err := httpsync.NewPublisher(e.pubHttpListenAddr, e.lsys, e.key)
var httpPub *httpsync.Publisher
var err error
if e.pubHttpWithoutServer {
httpPub, err = httpsync.NewPublisherWithoutServer(e.pubHttpListenAddr, e.pubHttpHandlerPath, e.lsys, e.key)
} else {
httpPub, err = httpsync.NewPublisher(e.pubHttpListenAddr, e.lsys, e.key)
}
if err != nil {
return nil, fmt.Errorf("cannot create http publisher: %w", err)
}
Expand All @@ -172,29 +177,29 @@ func (e *Engine) newPublisher() (dagsync.Publisher, error) {
return nil, fmt.Errorf("unknown publisher kind: %s", e.pubKind)
}

func createSenders(directAnnounceURLs []*url.URL, p2pHost host.Host, pubsubTopicName string, extraGossipData []byte, existingTopic *pubsub.Topic) ([]announce.Sender, error) {
func (e *Engine) createSenders() ([]announce.Sender, error) {
var senders []announce.Sender

// If there are announce URLs, then creage an announce sender to send
// direct HTTP announce messages to these URLs.
if len(directAnnounceURLs) != 0 {
var peerID peer.ID
if p2pHost != nil {
peerID = p2pHost.ID()
if len(e.announceURLs) != 0 {
id, err := peer.IDFromPrivateKey(e.key)
if err != nil {
return nil, fmt.Errorf("cannot get peer ID from private key: %w", err)
}
httpSender, err := httpsender.New(directAnnounceURLs, peerID)
httpSender, err := httpsender.New(e.announceURLs, id)
if err != nil {
return nil, fmt.Errorf("cannot create http announce sender: %w", err)
}
senders = append(senders, httpSender)
}

// If there is a libp2p host, then create a gossip pubsub announce sender.
if p2pHost != nil {
if e.h != nil {
// Create an announce sender to send over gossip pubsub.
p2pSender, err := p2psender.New(p2pHost, pubsubTopicName,
p2psender.WithTopic(existingTopic),
p2psender.WithExtraData(extraGossipData))
p2pSender, err := p2psender.New(e.h, e.pubTopicName,
p2psender.WithTopic(e.pubTopic),
p2psender.WithExtraData(e.pubExtraGossipData))
if err != nil {
return nil, fmt.Errorf("cannot create p2p pubsub announce sender: %w", err)
}
Expand Down Expand Up @@ -270,10 +275,14 @@ func (e *Engine) Publish(ctx context.Context, adv schema.Advertisement) (cid.Cid
// Only announce the advertisement CID if publisher is configured.
if e.publisher != nil {
log := log.With("adCid", c)
if len(e.announceURLs) == 0 {
if len(e.announceURLs) == 0 && e.h != nil {
log.Info("Announcing advertisement in pubsub channel")
} else {
} else if len(e.announceURLs) != 0 && e.h == nil {
log.Info("Announcing advertisement via http")
} else if len(e.announceURLs) != 0 && e.h != nil {
log.Info("Announcing advertisement in pubsub channel and via http")
} else {
return cid.Undef, fmt.Errorf("unexpected publisher state, no announceURLs or libp2p host")
}

e.publisher.SetRoot(c)
Expand Down Expand Up @@ -432,7 +441,8 @@ func (e *Engine) NotifyRemove(ctx context.Context, provider peer.ID, contextID [
return e.publishAdvForIndex(ctx, provider, nil, contextID, metadata.Metadata{}, true)
}

// LinkSystem gets the link system used by the engine to store and retrieve advertisement data.
// LinkSystem gets the link system used by the engine to store and retrieve
// advertisement data.
func (e *Engine) LinkSystem() *ipld.LinkSystem {
return &e.lsys
}
Expand All @@ -457,6 +467,24 @@ func (e *Engine) Shutdown() error {
return errs
}

// GetPublisherHttpFunc gets the http.HandlerFunc that can be used to serve
// advertisements over HTTP. The returned handler is only valid if the
// PublisherKind is HttpPublisher and the HttpPublisherWithoutServer option is
// set.
func (e *Engine) GetPublisherHttpFunc() (http.HandlerFunc, error) {
if e.publisher == nil {
return nil, errors.New("no publisher configured")
}
if !e.pubHttpWithoutServer {
return nil, errors.New("HttpPublisherWithoutServer option not set")
}
hp, ok := e.publisher.(*httpsync.Publisher)
if !ok {
return nil, errors.New("publisher is not an http publisher")
}
return hp.ServeHTTP, nil
}

// GetAdv gets the advertisement associated to the given cid c. The context is
// not used.
func (e *Engine) GetAdv(_ context.Context, adCid cid.Cid) (*schema.Advertisement, error) {
Expand Down Expand Up @@ -504,9 +532,8 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, p peer.ID, addrs []mult
}
}

// If not removing, then generate the link for the list of
// CIDs from the contextID using the multihash lister, and store the
// relationship.
// If not removing, then generate the link for the list of CIDs from the
// contextID using the multihash lister, and store the relationship.
if !isRm {
log.Info("Creating advertisement")

Expand Down Expand Up @@ -664,14 +691,14 @@ func (e *Engine) keyToMetadataKey(provider peer.ID, contextID []byte) datastore.
func (e *Engine) putKeyCidMap(ctx context.Context, provider peer.ID, contextID []byte, c cid.Cid) error {
// Store the map Key-Cid to know what CidLink to put in advertisement when
// notifying about a removal.

err := e.ds.Put(ctx, e.keyToCidKey(provider, contextID), c.Bytes())
if err != nil {
return err
}
// And the other way around when graphsync is making a request, so the
// lister in the linksystem knows to what contextID the CID referrs to.
// it's enough for us to store just a single mapping of cid to provider and context to generate chunks
// it's enough for us to store just a single mapping of cid to provider and
// context to generate chunks

pB, err := provider.Marshal()
if err != nil {
Expand Down Expand Up @@ -710,13 +737,15 @@ type providerAndContext struct {
ContextID []byte `json:"c"`
}

// getCidKeyMap returns the provider and contextID for a given cid. Provider and Context ID are guaranteed to be
// not nil. In the case if legacy index exists, the default provider identity is assumed.
// getCidKeyMap returns the provider and contextID for a given cid. Provider
// and Context ID are guaranteed to be not nil. In the case if legacy index
// exists, the default provider identity is assumed.
func (e *Engine) getCidKeyMap(ctx context.Context, c cid.Cid) (*providerAndContext, error) {
// first see whether the mapping exists in the legacy index
val, err := e.ds.Get(ctx, e.cidToKeyKey(c))
if err == nil {
// if the mapping has been found in the legacy index - return the default provider identity
// if the mapping has been found in the legacy index - return the
// default provider identity.
return &providerAndContext{Provider: []byte(e.provider.ID), ContextID: val}, nil
}
if !errors.Is(err, datastore.ErrNotFound) {
Expand All @@ -733,7 +762,7 @@ func (e *Engine) getCidKeyMap(ctx context.Context, c cid.Cid) (*providerAndConte
if err != nil {
return nil, err
}
// in case if provider is empty (which should never happen), assume the default one
// if provider is empty (which should never happen), assume the default one
if len(pAndC.Provider) == 0 {
pAndC.Provider = []byte(e.provider.ID)
}
Expand Down
38 changes: 34 additions & 4 deletions engine/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type (
// published.
pubHttpAnnounceAddrs []multiaddr.Multiaddr
pubHttpListenAddr string
pubHttpWithoutServer bool
pubHttpHandlerPath string
pubTopicName string
pubTopic *pubsub.Topic
pubExtraGossipData []byte
Expand Down Expand Up @@ -116,7 +118,8 @@ func newOptions(o ...Option) (*options, error) {
opts.ds = dssync.MutexWrap(datastore.NewMapDatastore())
}

if opts.h == nil {
if (opts.key == nil || len(opts.provider.Addrs) == 0 || opts.provider.ID == "") && opts.h == nil {
// need a host
h, err := libp2p.New()
if err != nil {
return nil, err
Expand All @@ -125,8 +128,10 @@ func newOptions(o ...Option) (*options, error) {
opts.h = h
}

// Initialize private key from libp2p host
opts.key = opts.h.Peerstore().PrivKey(opts.h.ID())
if opts.key == nil {
// Initialize private key from libp2p host
opts.key = opts.h.Peerstore().PrivKey(opts.h.ID())
}
// Defensively check that host's self private key is indeed set.
if opts.key == nil {
return nil, fmt.Errorf("cannot find private key in self peerstore; libp2p host is misconfigured")
Expand Down Expand Up @@ -240,6 +245,24 @@ func WithHttpPublisherListenAddr(addr string) Option {
}
}

// WithHttpPublisherWithoutServer sets the HTTP publisher to not start a server.
// Setting up the handler is left to the user.
func WithHttpPublisherWithoutServer() Option {
return func(o *options) error {
o.pubHttpWithoutServer = true
return nil
}
}

// WithHttpPublisherHandlerPath should only be used with
// WithHttpPublisherWithoutServer
func WithHttpPublisherHandlerPath(handlerPath string) Option {
return func(o *options) error {
o.pubHttpHandlerPath = handlerPath
return nil
}
}

// WithHttpPublisherAnnounceAddr sets the address to be supplied in announce
// messages to tell indexers where to retrieve advertisements.
//
Expand All @@ -249,7 +272,7 @@ func WithHttpPublisherAnnounceAddr(addr string) Option {
if addr != "" {
maddr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return fmt.Errorf("here: %w", err)
return err
}
o.pubHttpAnnounceAddrs = append(o.pubHttpAnnounceAddrs, maddr)
}
Expand Down Expand Up @@ -365,6 +388,13 @@ func WithExtraGossipData(extraData []byte) Option {
}
}

func WithPrivateKey(key crypto.PrivKey) Option {
return func(o *options) error {
o.key = key
return nil
}
}

// WithDirectAnnounce sets indexer URLs to send direct HTTP announcements to.
func WithDirectAnnounce(announceURLs ...string) Option {
return func(o *options) error {
Expand Down

0 comments on commit 2a822ff

Please sign in to comment.