Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Bring in gostream to the monorepo #2521

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions p2p/net/gostream/addr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package gostream

import "github.com/libp2p/go-libp2p/core/peer"

// addr implements net.Addr and holds a libp2p peer ID.
type addr struct{ id peer.ID }

// Network returns the name of the network that this address belongs to
// (libp2p).
func (a *addr) Network() string { return Network }

// String returns the peer ID of this address in string form
// (B58-encoded).
func (a *addr) String() string { return a.id.String() }
43 changes: 43 additions & 0 deletions p2p/net/gostream/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package gostream

import (
"context"
"net"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)

// conn is an implementation of net.Conn which wraps
// libp2p streams.
type conn struct {
network.Stream
}

// newConn creates a conn given a libp2p stream
func newConn(s network.Stream) net.Conn {
return &conn{s}
}

// LocalAddr returns the local network address.
func (c *conn) LocalAddr() net.Addr {
return &addr{c.Stream.Conn().LocalPeer()}
}

// RemoteAddr returns the remote network address.
func (c *conn) RemoteAddr() net.Addr {
return &addr{c.Stream.Conn().RemotePeer()}
}

// Dial opens a stream to the destination address
// (which should parseable to a peer ID) using the given
// host and returns it as a standard net.Conn.
func Dial(ctx context.Context, h host.Host, pid peer.ID, tag protocol.ID) (net.Conn, error) {
s, err := h.NewStream(ctx, pid, tag)
if err != nil {
return nil, err
}
return newConn(s), nil
}
19 changes: 19 additions & 0 deletions p2p/net/gostream/gostream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Package gostream allows to replace the standard net stack in Go
// with [LibP2P](https://github.com/libp2p/libp2p) streams.
//
// Given a libp2p.Host, gostream provides Dial() and Listen() methods which
// return implementations of net.Conn and net.Listener.
//
// Instead of the regular "host:port" addressing, `gostream` uses a Peer ID,
// and rather than a raw TCP connection, gostream will use libp2p's net.Stream.
// This means your connections will take advantage of LibP2P's multi-routes,
// NAT transversal and stream multiplexing.
//
// Note that LibP2P hosts cannot dial to themselves, so there is no possibility
// of using the same Host as server and as client.
package gostream

// Network is the "net.Addr.Network()" name returned by
// addresses used by gostream connections. In turn, the "net.Addr.String()" will
// be a peer ID.
var Network = "libp2p"
141 changes: 141 additions & 0 deletions p2p/net/gostream/gostream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package gostream

import (
"bufio"
"context"
"io"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
)

// newHost illustrates how to build a libp2p host with secio using
// a randomly generated key-pair
func newHost(t *testing.T, listen multiaddr.Multiaddr) host.Host {
h, err := libp2p.New(
libp2p.ListenAddrs(listen),
)
if err != nil {
t.Fatal(err)
}
return h
}

func TestServerClient(t *testing.T) {
m1, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/10000")
m2, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/10001")
srvHost := newHost(t, m1)
clientHost := newHost(t, m2)
defer srvHost.Close()
defer clientHost.Close()

srvHost.Peerstore().AddAddrs(clientHost.ID(), clientHost.Addrs(), peerstore.PermanentAddrTTL)
clientHost.Peerstore().AddAddrs(srvHost.ID(), srvHost.Addrs(), peerstore.PermanentAddrTTL)

var tag protocol.ID = "/testitytest"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

done := make(chan struct{})
go func() {
defer close(done)
listener, err := Listen(srvHost, tag)
if err != nil {
t.Error(err)
return
}
defer listener.Close()

if listener.Addr().String() != srvHost.ID().Pretty() {
t.Error("bad listener address")
return
}

servConn, err := listener.Accept()
if err != nil {
t.Error(err)
return
}
defer servConn.Close()

reader := bufio.NewReader(servConn)
for {
msg, err := reader.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
t.Error(err)
return
}
if msg != "is libp2p awesome?\n" {
t.Errorf("Bad incoming message: %s", msg)
return
}

_, err = servConn.Write([]byte("yes it is\n"))
if err != nil {
t.Error(err)
return
}
}
}()

clientConn, err := Dial(ctx, clientHost, srvHost.ID(), tag)
if err != nil {
t.Fatal(err)
}

if clientConn.LocalAddr().String() != clientHost.ID().Pretty() {
t.Fatal("Bad LocalAddr")
}

if clientConn.RemoteAddr().String() != srvHost.ID().Pretty() {
t.Fatal("Bad RemoteAddr")
}

if clientConn.LocalAddr().Network() != Network {
t.Fatal("Bad Network()")
}

err = clientConn.SetDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}

err = clientConn.SetReadDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}

err = clientConn.SetWriteDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}

_, err = clientConn.Write([]byte("is libp2p awesome?\n"))
if err != nil {
t.Fatal(err)
}

reader := bufio.NewReader(clientConn)
resp, err := reader.ReadString('\n')
if err != nil {
t.Fatal(err)
}

if string(resp) != "yes it is\n" {
t.Errorf("Bad response: %s", resp)
}

err = clientConn.Close()
if err != nil {
t.Fatal(err)
}
<-done
}
71 changes: 71 additions & 0 deletions p2p/net/gostream/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package gostream

import (
"context"
"net"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
)

// listener is an implementation of net.Listener which handles
// http-tagged streams from a libp2p connection.
// A listener can be built with Listen()
type listener struct {
host host.Host
ctx context.Context
tag protocol.ID
cancel func()
streamCh chan network.Stream
}

// Accept returns the next a connection to this listener.
// It blocks if there are no connections. Under the hood,
// connections are libp2p streams.
func (l *listener) Accept() (net.Conn, error) {
select {
case s := <-l.streamCh:
return newConn(s), nil
case <-l.ctx.Done():
return nil, l.ctx.Err()
}
}

// Close terminates this listener. It will no longer handle any
// incoming streams
func (l *listener) Close() error {
l.cancel()
l.host.RemoveStreamHandler(l.tag)
return nil
}

// Addr returns the address for this listener, which is its libp2p Peer ID.
func (l *listener) Addr() net.Addr {
return &addr{l.host.ID()}
}

// Listen provides a standard net.Listener ready to accept "connections".
// Under the hood, these connections are libp2p streams tagged with the
// given protocol.ID.
func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {
ctx, cancel := context.WithCancel(context.Background())

l := &listener{
host: h,
ctx: ctx,
cancel: cancel,
tag: tag,
streamCh: make(chan network.Stream),
}

h.SetStreamHandler(tag, func(s network.Stream) {
select {
case l.streamCh <- s:
case <-ctx.Done():
s.Reset()
}
})

return l, nil
}
Loading