Consolidate abstractions and core types into go-libp2p-core (#601)

This commit is contained in:
Raúl Kripalani
2019-05-26 22:55:46 +01:00
committed by GitHub
parent 6813fdd0d1
commit d87f89314c
39 changed files with 544 additions and 538 deletions
+40 -34
View File
@@ -7,15 +7,19 @@ import (
"sync"
"time"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
inat "github.com/libp2p/go-libp2p-nat"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
protocol "github.com/libp2p/go-libp2p-protocol"
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
ping "github.com/libp2p/go-libp2p/p2p/protocol/ping"
ma "github.com/multiformats/go-multiaddr"
@@ -58,13 +62,13 @@ const NATPortMap Option = iota
// * uses an identity service to send + receive node information
// * uses a nat service to establish NAT port mappings
type BasicHost struct {
network inet.Network
network network.Network
mux *msmux.MultistreamMuxer
ids *identify.IDService
pings *ping.PingService
natmgr NATManager
maResolver *madns.Resolver
cmgr ifconnmgr.ConnManager
cmgr connmgr.ConnManager
AddrsFactory AddrsFactory
@@ -78,6 +82,8 @@ type BasicHost struct {
lastAddrs []ma.Multiaddr
}
var _ host.Host = (*BasicHost)(nil)
// HostOpts holds options that can be passed to NewHost in order to
// customize construction of the *BasicHost.
type HostOpts struct {
@@ -104,17 +110,17 @@ type HostOpts struct {
// NATManager takes care of setting NAT port mappings, and discovering external addresses.
// If omitted, this will simply be disabled.
NATManager func(inet.Network) NATManager
NATManager func(network.Network) NATManager
// ConnManager is a libp2p connection manager
ConnManager ifconnmgr.ConnManager
ConnManager connmgr.ConnManager
// EnablePing indicates whether to instantiate the ping service
EnablePing bool
}
// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network.
func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost, error) {
func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHost, error) {
bgctx, cancel := context.WithCancel(ctx)
h := &BasicHost{
@@ -162,7 +168,7 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost,
}
if opts.ConnManager == nil {
h.cmgr = &ifconnmgr.NullConnMgr{}
h.cmgr = &connmgr.NullConnMgr{}
} else {
h.cmgr = opts.ConnManager
net.Notify(h.cmgr.Notifee())
@@ -182,11 +188,11 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost,
// The following options can be passed:
// * NATPortMap
// * AddrsFactory
// * ifconnmgr.ConnManager
// * connmgr.ConnManager
// * madns.Resolver
//
// This function is deprecated in favor of NewHost and HostOpts.
func New(net inet.Network, opts ...interface{}) *BasicHost {
func New(net network.Network, opts ...interface{}) *BasicHost {
hostopts := &HostOpts{}
for _, o := range opts {
@@ -198,7 +204,7 @@ func New(net inet.Network, opts ...interface{}) *BasicHost {
}
case AddrsFactory:
hostopts.AddrsFactory = AddrsFactory(o)
case ifconnmgr.ConnManager:
case connmgr.ConnManager:
hostopts.ConnManager = o
case *madns.Resolver:
hostopts.MultiaddrResolver = o
@@ -221,16 +227,16 @@ func (h *BasicHost) Start() {
}
// newConnHandler is the remote-opened conn handler for inet.Network
func (h *BasicHost) newConnHandler(c inet.Conn) {
func (h *BasicHost) newConnHandler(c network.Conn) {
// Clear protocols on connecting to new peer to avoid issues caused
// by misremembering protocols between reconnects
h.Peerstore().SetProtocols(c.RemotePeer())
h.ids.IdentifyConn(c)
}
// newStreamHandler is the remote-opened stream handler for inet.Network
// newStreamHandler is the remote-opened stream handler for network.Network
// TODO: this feels a bit wonky
func (h *BasicHost) newStreamHandler(s inet.Stream) {
func (h *BasicHost) newStreamHandler(s network.Stream) {
before := time.Now()
if h.negtimeout > 0 {
@@ -344,17 +350,17 @@ func (h *BasicHost) ID() peer.ID {
}
// Peerstore returns the Host's repository of Peer Addresses and Keys.
func (h *BasicHost) Peerstore() pstore.Peerstore {
func (h *BasicHost) Peerstore() peerstore.Peerstore {
return h.Network().Peerstore()
}
// Network returns the Network interface of the Host
func (h *BasicHost) Network() inet.Network {
func (h *BasicHost) Network() network.Network {
return h.network
}
// Mux returns the Mux multiplexing incoming streams to protocol handlers
func (h *BasicHost) Mux() *msmux.MultistreamMuxer {
func (h *BasicHost) Mux() protocol.Switch {
return h.mux
}
@@ -367,9 +373,9 @@ func (h *BasicHost) IDService() *identify.IDService {
// This is equivalent to:
// host.Mux().SetHandler(proto, handler)
// (Threadsafe)
func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler) {
func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) {
h.Mux().AddHandler(string(pid), func(p string, rwc io.ReadWriteCloser) error {
is := rwc.(inet.Stream)
is := rwc.(network.Stream)
is.SetProtocol(protocol.ID(p))
handler(is)
return nil
@@ -378,9 +384,9 @@ func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler
// SetStreamHandlerMatch sets the protocol handler on the Host's Mux
// using a matching function to do protocol comparisons
func (h *BasicHost) SetStreamHandlerMatch(pid protocol.ID, m func(string) bool, handler inet.StreamHandler) {
func (h *BasicHost) SetStreamHandlerMatch(pid protocol.ID, m func(string) bool, handler network.StreamHandler) {
h.Mux().AddHandlerWithFunc(string(pid), m, func(p string, rwc io.ReadWriteCloser) error {
is := rwc.(inet.Stream)
is := rwc.(network.Stream)
is.SetProtocol(protocol.ID(p))
handler(is)
return nil
@@ -396,7 +402,7 @@ func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) {
// header with given protocol.ID. If there is no connection to p, attempts
// to create one. If ProtocolID is "", writes no header.
// (Threadsafe)
func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (inet.Stream, error) {
func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (network.Stream, error) {
pref, err := h.preferredProtocol(p, pids)
if err != nil {
return nil, err
@@ -450,7 +456,7 @@ func (h *BasicHost) preferredProtocol(p peer.ID, pids []protocol.ID) (protocol.I
return out, nil
}
func (h *BasicHost) newStream(ctx context.Context, p peer.ID, pid protocol.ID) (inet.Stream, error) {
func (h *BasicHost) newStream(ctx context.Context, p peer.ID, pid protocol.ID) (network.Stream, error) {
s, err := h.Network().NewStream(ctx, p)
if err != nil {
return nil, err
@@ -470,11 +476,11 @@ func (h *BasicHost) newStream(ctx context.Context, p peer.ID, pid protocol.ID) (
// h.Network.Dial, and block until a connection is open, or an error is returned.
// Connect will absorb the addresses in pi into its internal peerstore.
// It will also resolve any /dns4, /dns6, and /dnsaddr addresses.
func (h *BasicHost) Connect(ctx context.Context, pi pstore.PeerInfo) error {
func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
// absorb addresses into peerstore
h.Peerstore().AddAddrs(pi.ID, pi.Addrs, pstore.TempAddrTTL)
h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)
if h.Network().Connectedness(pi.ID) == inet.Connected {
if h.Network().Connectedness(pi.ID) == network.Connected {
return nil
}
@@ -482,12 +488,12 @@ func (h *BasicHost) Connect(ctx context.Context, pi pstore.PeerInfo) error {
if err != nil {
return err
}
h.Peerstore().AddAddrs(pi.ID, resolved, pstore.TempAddrTTL)
h.Peerstore().AddAddrs(pi.ID, resolved, peerstore.TempAddrTTL)
return h.dialPeer(ctx, pi.ID)
}
func (h *BasicHost) resolveAddrs(ctx context.Context, pi pstore.PeerInfo) ([]ma.Multiaddr, error) {
func (h *BasicHost) resolveAddrs(ctx context.Context, pi peer.AddrInfo) ([]ma.Multiaddr, error) {
proto := ma.ProtocolWithCode(ma.P_P2P).Name
p2paddr, err := ma.NewMultiaddr("/" + proto + "/" + pi.ID.Pretty())
if err != nil {
@@ -507,7 +513,7 @@ func (h *BasicHost) resolveAddrs(ctx context.Context, pi pstore.PeerInfo) ([]ma.
log.Infof("error resolving %s: %s", reqaddr, err)
}
for _, res := range resaddrs {
pi, err := pstore.InfoFromP2pAddr(res)
pi, err := peer.AddrInfoFromP2pAddr(res)
if err != nil {
log.Infof("error parsing %s: %s", res, err)
}
@@ -549,7 +555,7 @@ func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error {
return nil
}
func (h *BasicHost) ConnManager() ifconnmgr.ConnManager {
func (h *BasicHost) ConnManager() connmgr.ConnManager {
return h.cmgr
}
@@ -724,7 +730,7 @@ func (h *BasicHost) Close() error {
}
type streamWrapper struct {
inet.Stream
network.Stream
rw io.ReadWriter
}