mirror of
https://github.com/libp2p/go-libp2p.git
synced 2026-04-23 00:27:05 +08:00
Use circuitv2 code (#1183)
* move circuitv2 to p2p/protocol * update circuitv2 imports * RIP circuit v2; use circuitv2 * fix autorelay test * fix holepunch test * fix relay example
This commit is contained in:
@@ -0,0 +1,66 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
|
||||
)
|
||||
|
||||
var log = logging.Logger("p2p-circuit")
|
||||
|
||||
// Client implements the client-side of the p2p-circuit/v2 protocol:
|
||||
// - it implements dialing through v2 relays
|
||||
// - it listens for incoming connections through v2 relays.
|
||||
//
|
||||
// For backwards compatibility with v1 relays and older nodes, the client will
|
||||
// also accept relay connections through v1 relays and fallback dial peers using p2p-circuit/v1.
|
||||
// This allows us to use the v2 code as drop in replacement for v1 in a host without breaking
|
||||
// existing code and interoperability with older nodes.
|
||||
type Client struct {
|
||||
ctx context.Context
|
||||
host host.Host
|
||||
upgrader *tptu.Upgrader
|
||||
|
||||
incoming chan accept
|
||||
|
||||
mx sync.Mutex
|
||||
activeDials map[peer.ID]*completion
|
||||
hopCount map[peer.ID]int
|
||||
}
|
||||
|
||||
type accept struct {
|
||||
conn *Conn
|
||||
writeResponse func() error
|
||||
}
|
||||
|
||||
type completion struct {
|
||||
ch chan struct{}
|
||||
relay peer.ID
|
||||
err error
|
||||
}
|
||||
|
||||
// New constructs a new p2p-circuit/v2 client, attached to the given host and using the given
|
||||
// upgrader to perform connection upgrades.
|
||||
func New(ctx context.Context, h host.Host, upgrader *tptu.Upgrader) (*Client, error) {
|
||||
return &Client{
|
||||
ctx: ctx,
|
||||
host: h,
|
||||
upgrader: upgrader,
|
||||
incoming: make(chan accept),
|
||||
activeDials: make(map[peer.ID]*completion),
|
||||
hopCount: make(map[peer.ID]int),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Start registers the circuit (client) protocol stream handlers
|
||||
func (c *Client) Start() {
|
||||
c.host.SetStreamHandler(proto.ProtoIDv1, c.handleStreamV1)
|
||||
c.host.SetStreamHandler(proto.ProtoIDv2Stop, c.handleStreamV2)
|
||||
}
|
||||
@@ -0,0 +1,145 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
manet "github.com/multiformats/go-multiaddr/net"
|
||||
)
|
||||
|
||||
// HopTagWeight is the connection manager weight for connections carrying relay hop streams
|
||||
var HopTagWeight = 5
|
||||
|
||||
type statLimitDuration struct{}
|
||||
type statLimitData struct{}
|
||||
|
||||
var (
|
||||
StatLimitDuration = statLimitDuration{}
|
||||
StatLimitData = statLimitData{}
|
||||
)
|
||||
|
||||
type Conn struct {
|
||||
stream network.Stream
|
||||
remote peer.AddrInfo
|
||||
stat network.Stat
|
||||
|
||||
client *Client
|
||||
}
|
||||
|
||||
type NetAddr struct {
|
||||
Relay string
|
||||
Remote string
|
||||
}
|
||||
|
||||
var _ net.Addr = (*NetAddr)(nil)
|
||||
|
||||
func (n *NetAddr) Network() string {
|
||||
return "libp2p-circuit-relay"
|
||||
}
|
||||
|
||||
func (n *NetAddr) String() string {
|
||||
return fmt.Sprintf("relay[%s-%s]", n.Remote, n.Relay)
|
||||
}
|
||||
|
||||
// Conn interface
|
||||
var _ manet.Conn = (*Conn)(nil)
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
c.untagHop()
|
||||
return c.stream.Reset()
|
||||
}
|
||||
|
||||
func (c *Conn) Read(buf []byte) (int, error) {
|
||||
return c.stream.Read(buf)
|
||||
}
|
||||
|
||||
func (c *Conn) Write(buf []byte) (int, error) {
|
||||
return c.stream.Write(buf)
|
||||
}
|
||||
|
||||
func (c *Conn) SetDeadline(t time.Time) error {
|
||||
return c.stream.SetDeadline(t)
|
||||
}
|
||||
|
||||
func (c *Conn) SetReadDeadline(t time.Time) error {
|
||||
return c.stream.SetReadDeadline(t)
|
||||
}
|
||||
|
||||
func (c *Conn) SetWriteDeadline(t time.Time) error {
|
||||
return c.stream.SetWriteDeadline(t)
|
||||
}
|
||||
|
||||
// TODO: is it okay to cast c.Conn().RemotePeer() into a multiaddr? might be "user input"
|
||||
func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
|
||||
// TODO: We should be able to do this directly without converting to/from a string.
|
||||
relayAddr, err := ma.NewComponent(
|
||||
ma.ProtocolWithCode(ma.P_P2P).Name,
|
||||
c.stream.Conn().RemotePeer().Pretty(),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return ma.Join(c.stream.Conn().RemoteMultiaddr(), relayAddr, circuitAddr)
|
||||
}
|
||||
|
||||
func (c *Conn) LocalMultiaddr() ma.Multiaddr {
|
||||
return c.stream.Conn().LocalMultiaddr()
|
||||
}
|
||||
|
||||
func (c *Conn) LocalAddr() net.Addr {
|
||||
na, err := manet.ToNetAddr(c.stream.Conn().LocalMultiaddr())
|
||||
if err != nil {
|
||||
log.Error("failed to convert local multiaddr to net addr:", err)
|
||||
return nil
|
||||
}
|
||||
return na
|
||||
}
|
||||
|
||||
func (c *Conn) RemoteAddr() net.Addr {
|
||||
return &NetAddr{
|
||||
Relay: c.stream.Conn().RemotePeer().Pretty(),
|
||||
Remote: c.remote.ID.Pretty(),
|
||||
}
|
||||
}
|
||||
|
||||
// ConnStat interface
|
||||
var _ network.ConnStat = (*Conn)(nil)
|
||||
|
||||
func (c *Conn) Stat() network.Stat {
|
||||
return c.stat
|
||||
}
|
||||
|
||||
// tagHop tags the underlying relay connection so that it can be (somewhat) protected from the
|
||||
// connection manager as it is an important connection that proxies other connections.
|
||||
// This is handled here so that the user code doesnt need to bother with this and avoid
|
||||
// clown shoes situations where a high value peer connection is behind a relayed connection and it is
|
||||
// implicitly because the connection manager closed the underlying relay connection.
|
||||
func (c *Conn) tagHop() {
|
||||
c.client.mx.Lock()
|
||||
defer c.client.mx.Unlock()
|
||||
|
||||
p := c.stream.Conn().RemotePeer()
|
||||
c.client.hopCount[p]++
|
||||
if c.client.hopCount[p] == 1 {
|
||||
c.client.host.ConnManager().TagPeer(p, "relay-hop-stream", HopTagWeight)
|
||||
}
|
||||
}
|
||||
|
||||
// untagHop removes the relay-hop-stream tag if necessary; it is invoked when a relayed connection
|
||||
// is closed.
|
||||
func (c *Conn) untagHop() {
|
||||
c.client.mx.Lock()
|
||||
defer c.client.mx.Unlock()
|
||||
|
||||
p := c.stream.Conn().RemotePeer()
|
||||
c.client.hopCount[p]--
|
||||
if c.client.hopCount[p] == 0 {
|
||||
c.client.host.ConnManager().UntagPeer(p, "relay-hop-stream")
|
||||
delete(c.client.hopCount, p)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,239 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
pbv1 "github.com/libp2p/go-libp2p-circuit/pb"
|
||||
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
const maxMessageSize = 4096
|
||||
|
||||
var DialTimeout = time.Minute
|
||||
var DialRelayTimeout = 5 * time.Second
|
||||
|
||||
// relay protocol errors; used for signalling deduplication
|
||||
type relayError struct {
|
||||
err string
|
||||
}
|
||||
|
||||
func (e relayError) Error() string {
|
||||
return e.err
|
||||
}
|
||||
|
||||
func newRelayError(t string, args ...interface{}) error {
|
||||
return relayError{err: fmt.Sprintf(t, args...)}
|
||||
}
|
||||
|
||||
func isRelayError(err error) bool {
|
||||
_, ok := err.(relayError)
|
||||
return ok
|
||||
}
|
||||
|
||||
// dialer
|
||||
func (c *Client) dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (*Conn, error) {
|
||||
// split /a/p2p-circuit/b into (/a, /p2p-circuit/b)
|
||||
relayaddr, destaddr := ma.SplitFunc(a, func(c ma.Component) bool {
|
||||
return c.Protocol().Code == ma.P_CIRCUIT
|
||||
})
|
||||
|
||||
// If the address contained no /p2p-circuit part, the second part is nil.
|
||||
if destaddr == nil {
|
||||
return nil, fmt.Errorf("%s is not a relay address", a)
|
||||
}
|
||||
|
||||
if relayaddr == nil {
|
||||
return nil, fmt.Errorf("can't dial a p2p-circuit without specifying a relay: %s", a)
|
||||
}
|
||||
|
||||
dinfo := peer.AddrInfo{ID: p}
|
||||
|
||||
// Strip the /p2p-circuit prefix from the destaddr so that we can pass the destination address
|
||||
// (if present) for active relays
|
||||
_, destaddr = ma.SplitFirst(destaddr)
|
||||
if destaddr != nil {
|
||||
dinfo.Addrs = append(dinfo.Addrs, destaddr)
|
||||
}
|
||||
|
||||
rinfo, err := peer.AddrInfoFromP2pAddr(relayaddr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing relay multiaddr '%s': %w", relayaddr, err)
|
||||
}
|
||||
|
||||
// deduplicate active relay dials to the same peer
|
||||
retry:
|
||||
c.mx.Lock()
|
||||
dedup, active := c.activeDials[p]
|
||||
if !active {
|
||||
dedup = &completion{ch: make(chan struct{}), relay: rinfo.ID}
|
||||
c.activeDials[p] = dedup
|
||||
}
|
||||
c.mx.Unlock()
|
||||
|
||||
if active {
|
||||
select {
|
||||
case <-dedup.ch:
|
||||
if dedup.err != nil {
|
||||
if dedup.relay != rinfo.ID {
|
||||
// different relay, retry
|
||||
goto retry
|
||||
}
|
||||
|
||||
if !isRelayError(dedup.err) {
|
||||
// not a relay protocol error, retry
|
||||
goto retry
|
||||
}
|
||||
|
||||
// don't try the same relay if it failed to connect with a protocol error
|
||||
return nil, fmt.Errorf("concurrent active dial through the same relay failed with a protocol error")
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("concurrent active dial succeeded")
|
||||
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
conn, err := c.dialPeer(ctx, *rinfo, dinfo)
|
||||
|
||||
c.mx.Lock()
|
||||
dedup.err = err
|
||||
close(dedup.ch)
|
||||
delete(c.activeDials, p)
|
||||
c.mx.Unlock()
|
||||
|
||||
return conn, err
|
||||
}
|
||||
|
||||
func (c *Client) dialPeer(ctx context.Context, relay, dest peer.AddrInfo) (*Conn, error) {
|
||||
log.Debugf("dialing peer %s through relay %s", dest.ID, relay.ID)
|
||||
|
||||
if len(relay.Addrs) > 0 {
|
||||
c.host.Peerstore().AddAddrs(relay.ID, relay.Addrs, peerstore.TempAddrTTL)
|
||||
}
|
||||
|
||||
dialCtx, cancel := context.WithTimeout(ctx, DialRelayTimeout)
|
||||
defer cancel()
|
||||
s, err := c.host.NewStream(dialCtx, relay.ID, proto.ProtoIDv2Hop, proto.ProtoIDv1)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error opening hop stream to relay: %w", err)
|
||||
}
|
||||
|
||||
switch s.Protocol() {
|
||||
case proto.ProtoIDv2Hop:
|
||||
return c.connectV2(s, dest)
|
||||
|
||||
case proto.ProtoIDv1:
|
||||
return c.connectV1(s, dest)
|
||||
|
||||
default:
|
||||
s.Reset()
|
||||
return nil, fmt.Errorf("unexpected stream protocol: %s", s.Protocol())
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) connectV2(s network.Stream, dest peer.AddrInfo) (*Conn, error) {
|
||||
rd := util.NewDelimitedReader(s, maxMessageSize)
|
||||
wr := util.NewDelimitedWriter(s)
|
||||
defer rd.Close()
|
||||
|
||||
var msg pbv2.HopMessage
|
||||
|
||||
msg.Type = pbv2.HopMessage_CONNECT.Enum()
|
||||
msg.Peer = util.PeerInfoToPeerV2(dest)
|
||||
|
||||
s.SetDeadline(time.Now().Add(DialTimeout))
|
||||
|
||||
err := wr.WriteMsg(&msg)
|
||||
if err != nil {
|
||||
s.Reset()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
msg.Reset()
|
||||
|
||||
err = rd.ReadMsg(&msg)
|
||||
if err != nil {
|
||||
s.Reset()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.SetDeadline(time.Time{})
|
||||
|
||||
if msg.GetType() != pbv2.HopMessage_STATUS {
|
||||
s.Reset()
|
||||
return nil, newRelayError("unexpected relay response; not a status message (%d)", msg.GetType())
|
||||
}
|
||||
|
||||
status := msg.GetStatus()
|
||||
if status != pbv2.Status_OK {
|
||||
s.Reset()
|
||||
return nil, newRelayError("error opening relay circuit: %s (%d)", pbv2.Status_name[int32(status)], status)
|
||||
}
|
||||
|
||||
// check for a limit provided by the relay; if the limit is not nil, then this is a limited
|
||||
// relay connection and we mark the connection as transient.
|
||||
var stat network.Stat
|
||||
if limit := msg.GetLimit(); limit != nil {
|
||||
stat.Transient = true
|
||||
stat.Extra = make(map[interface{}]interface{})
|
||||
stat.Extra[StatLimitDuration] = time.Duration(limit.GetDuration()) * time.Second
|
||||
stat.Extra[StatLimitData] = limit.GetData()
|
||||
}
|
||||
|
||||
return &Conn{stream: s, remote: dest, stat: stat, client: c}, nil
|
||||
}
|
||||
|
||||
func (c *Client) connectV1(s network.Stream, dest peer.AddrInfo) (*Conn, error) {
|
||||
rd := util.NewDelimitedReader(s, maxMessageSize)
|
||||
wr := util.NewDelimitedWriter(s)
|
||||
defer rd.Close()
|
||||
|
||||
var msg pbv1.CircuitRelay
|
||||
|
||||
msg.Type = pbv1.CircuitRelay_HOP.Enum()
|
||||
msg.SrcPeer = util.PeerInfoToPeerV1(c.host.Peerstore().PeerInfo(c.host.ID()))
|
||||
msg.DstPeer = util.PeerInfoToPeerV1(dest)
|
||||
|
||||
s.SetDeadline(time.Now().Add(DialTimeout))
|
||||
|
||||
err := wr.WriteMsg(&msg)
|
||||
if err != nil {
|
||||
s.Reset()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
msg.Reset()
|
||||
|
||||
err = rd.ReadMsg(&msg)
|
||||
if err != nil {
|
||||
s.Reset()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.SetDeadline(time.Time{})
|
||||
|
||||
if msg.GetType() != pbv1.CircuitRelay_STATUS {
|
||||
s.Reset()
|
||||
return nil, newRelayError("unexpected relay response; not a status message (%d)", msg.GetType())
|
||||
}
|
||||
|
||||
status := msg.GetCode()
|
||||
if status != pbv1.CircuitRelay_SUCCESS {
|
||||
s.Reset()
|
||||
return nil, newRelayError("error opening relay circuit: %s (%d)", pbv1.CircuitRelay_Status_name[int32(status)], status)
|
||||
}
|
||||
|
||||
return &Conn{stream: s, remote: dest, client: c}, nil
|
||||
}
|
||||
@@ -0,0 +1,172 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
pbv1 "github.com/libp2p/go-libp2p-circuit/pb"
|
||||
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
)
|
||||
|
||||
var (
|
||||
StreamTimeout = 1 * time.Minute
|
||||
AcceptTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
func (c *Client) handleStreamV2(s network.Stream) {
|
||||
log.Debugf("new relay/v2 stream from: %s", s.Conn().RemotePeer())
|
||||
|
||||
s.SetReadDeadline(time.Now().Add(StreamTimeout))
|
||||
|
||||
rd := util.NewDelimitedReader(s, maxMessageSize)
|
||||
defer rd.Close()
|
||||
|
||||
writeResponse := func(status pbv2.Status) error {
|
||||
wr := util.NewDelimitedWriter(s)
|
||||
|
||||
var msg pbv2.StopMessage
|
||||
msg.Type = pbv2.StopMessage_STATUS.Enum()
|
||||
msg.Status = status.Enum()
|
||||
|
||||
return wr.WriteMsg(&msg)
|
||||
}
|
||||
|
||||
handleError := func(status pbv2.Status) {
|
||||
log.Debugf("protocol error: %s (%d)", pbv2.Status_name[int32(status)], status)
|
||||
err := writeResponse(status)
|
||||
if err != nil {
|
||||
s.Reset()
|
||||
log.Debugf("error writing circuit response: %s", err.Error())
|
||||
} else {
|
||||
s.Close()
|
||||
}
|
||||
}
|
||||
|
||||
var msg pbv2.StopMessage
|
||||
|
||||
err := rd.ReadMsg(&msg)
|
||||
if err != nil {
|
||||
handleError(pbv2.Status_MALFORMED_MESSAGE)
|
||||
return
|
||||
}
|
||||
// reset stream deadline as message has been read
|
||||
s.SetReadDeadline(time.Time{})
|
||||
|
||||
if msg.GetType() != pbv2.StopMessage_CONNECT {
|
||||
handleError(pbv2.Status_UNEXPECTED_MESSAGE)
|
||||
return
|
||||
}
|
||||
|
||||
src, err := util.PeerToPeerInfoV2(msg.GetPeer())
|
||||
if err != nil {
|
||||
handleError(pbv2.Status_MALFORMED_MESSAGE)
|
||||
return
|
||||
}
|
||||
|
||||
// check for a limit provided by the relay; if the limit is not nil, then this is a limited
|
||||
// relay connection and we mark the connection as transient.
|
||||
var stat network.Stat
|
||||
if limit := msg.GetLimit(); limit != nil {
|
||||
stat.Transient = true
|
||||
stat.Extra = make(map[interface{}]interface{})
|
||||
stat.Extra[StatLimitDuration] = time.Duration(limit.GetDuration()) * time.Second
|
||||
stat.Extra[StatLimitData] = limit.GetData()
|
||||
}
|
||||
|
||||
log.Debugf("incoming relay connection from: %s", src.ID)
|
||||
|
||||
select {
|
||||
case c.incoming <- accept{
|
||||
conn: &Conn{stream: s, remote: src, stat: stat, client: c},
|
||||
writeResponse: func() error {
|
||||
return writeResponse(pbv2.Status_OK)
|
||||
},
|
||||
}:
|
||||
case <-time.After(AcceptTimeout):
|
||||
handleError(pbv2.Status_CONNECTION_FAILED)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) handleStreamV1(s network.Stream) {
|
||||
log.Debugf("new relay/v1 stream from: %s", s.Conn().RemotePeer())
|
||||
|
||||
s.SetReadDeadline(time.Now().Add(StreamTimeout))
|
||||
|
||||
rd := util.NewDelimitedReader(s, maxMessageSize)
|
||||
defer rd.Close()
|
||||
|
||||
writeResponse := func(status pbv1.CircuitRelay_Status) error {
|
||||
wr := util.NewDelimitedWriter(s)
|
||||
|
||||
var msg pbv1.CircuitRelay
|
||||
msg.Type = pbv1.CircuitRelay_STATUS.Enum()
|
||||
msg.Code = status.Enum()
|
||||
|
||||
return wr.WriteMsg(&msg)
|
||||
}
|
||||
|
||||
handleError := func(status pbv1.CircuitRelay_Status) {
|
||||
log.Debugf("protocol error: %s (%d)", pbv1.CircuitRelay_Status_name[int32(status)], status)
|
||||
err := writeResponse(status)
|
||||
if err != nil {
|
||||
s.Reset()
|
||||
log.Debugf("error writing circuit response: %s", err.Error())
|
||||
} else {
|
||||
s.Close()
|
||||
}
|
||||
}
|
||||
|
||||
var msg pbv1.CircuitRelay
|
||||
|
||||
err := rd.ReadMsg(&msg)
|
||||
if err != nil {
|
||||
handleError(pbv1.CircuitRelay_MALFORMED_MESSAGE)
|
||||
return
|
||||
}
|
||||
// reset stream deadline as message has been read
|
||||
s.SetReadDeadline(time.Time{})
|
||||
|
||||
switch msg.GetType() {
|
||||
case pbv1.CircuitRelay_STOP:
|
||||
|
||||
case pbv1.CircuitRelay_HOP:
|
||||
handleError(pbv1.CircuitRelay_HOP_CANT_SPEAK_RELAY)
|
||||
return
|
||||
|
||||
case pbv1.CircuitRelay_CAN_HOP:
|
||||
handleError(pbv1.CircuitRelay_HOP_CANT_SPEAK_RELAY)
|
||||
return
|
||||
|
||||
default:
|
||||
log.Debugf("unexpected relay handshake: %d", msg.GetType())
|
||||
handleError(pbv1.CircuitRelay_MALFORMED_MESSAGE)
|
||||
return
|
||||
}
|
||||
|
||||
src, err := util.PeerToPeerInfoV1(msg.GetSrcPeer())
|
||||
if err != nil {
|
||||
handleError(pbv1.CircuitRelay_STOP_SRC_MULTIADDR_INVALID)
|
||||
return
|
||||
}
|
||||
|
||||
dst, err := util.PeerToPeerInfoV1(msg.GetDstPeer())
|
||||
if err != nil || dst.ID != c.host.ID() {
|
||||
handleError(pbv1.CircuitRelay_STOP_DST_MULTIADDR_INVALID)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("incoming relay connection from: %s", src.ID)
|
||||
|
||||
select {
|
||||
case c.incoming <- accept{
|
||||
conn: &Conn{stream: s, remote: src, client: c},
|
||||
writeResponse: func() error {
|
||||
return writeResponse(pbv1.CircuitRelay_SUCCESS)
|
||||
},
|
||||
}:
|
||||
case <-time.After(AcceptTimeout):
|
||||
handleError(pbv1.CircuitRelay_STOP_RELAY_REFUSED)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
manet "github.com/multiformats/go-multiaddr/net"
|
||||
)
|
||||
|
||||
var _ manet.Listener = (*Listener)(nil)
|
||||
|
||||
type Listener Client
|
||||
|
||||
func (c *Client) Listener() *Listener {
|
||||
return (*Listener)(c)
|
||||
}
|
||||
|
||||
func (l *Listener) Accept() (manet.Conn, error) {
|
||||
for {
|
||||
select {
|
||||
case evt := <-l.incoming:
|
||||
err := evt.writeResponse()
|
||||
if err != nil {
|
||||
log.Debugf("error writing relay response: %s", err.Error())
|
||||
evt.conn.stream.Reset()
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("accepted relay connection from %s through %s", evt.conn.remote.ID, evt.conn.RemoteMultiaddr())
|
||||
|
||||
evt.conn.tagHop()
|
||||
return evt.conn, nil
|
||||
|
||||
case <-l.ctx.Done():
|
||||
return nil, l.ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Listener) Addr() net.Addr {
|
||||
return &NetAddr{
|
||||
Relay: "any",
|
||||
Remote: "any",
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Listener) Multiaddr() ma.Multiaddr {
|
||||
return circuitAddr
|
||||
}
|
||||
|
||||
func (l *Listener) Close() error {
|
||||
// noop for now
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,122 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||
"github.com/libp2p/go-libp2p-core/record"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
var ReserveTimeout = time.Minute
|
||||
|
||||
// Reservation is a struct carrying information about a relay/v2 slot reservation.
|
||||
type Reservation struct {
|
||||
// Expiration is the expiration time of the reservation
|
||||
Expiration time.Time
|
||||
// Addrs contains the vouched public addresses of the reserving peer, which can be
|
||||
// announced to the network
|
||||
Addrs []ma.Multiaddr
|
||||
|
||||
// LimitDuration is the time limit for which the relay will keep a relayed connection
|
||||
// open. If 0, there is no limit.
|
||||
LimitDuration time.Duration
|
||||
// LimitData is the number of bytes that the relay will relay in each direction before
|
||||
// resetting a relayed connection.
|
||||
LimitData uint64
|
||||
|
||||
// Voucher is a signed reservation voucher provided by the relay
|
||||
Voucher *proto.ReservationVoucher
|
||||
}
|
||||
|
||||
// Reserve reserves a slot in a relay and returns the reservation information.
|
||||
// Clients must reserve slots in order for the relay to relay connections to them.
|
||||
func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation, error) {
|
||||
if len(ai.Addrs) > 0 {
|
||||
h.Peerstore().AddAddrs(ai.ID, ai.Addrs, peerstore.TempAddrTTL)
|
||||
}
|
||||
|
||||
s, err := h.NewStream(ctx, ai.ID, proto.ProtoIDv2Hop)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
rd := util.NewDelimitedReader(s, maxMessageSize)
|
||||
wr := util.NewDelimitedWriter(s)
|
||||
defer rd.Close()
|
||||
|
||||
var msg pbv2.HopMessage
|
||||
msg.Type = pbv2.HopMessage_RESERVE.Enum()
|
||||
|
||||
s.SetDeadline(time.Now().Add(ReserveTimeout))
|
||||
|
||||
if err := wr.WriteMsg(&msg); err != nil {
|
||||
s.Reset()
|
||||
return nil, fmt.Errorf("error writing reservation message: %w", err)
|
||||
}
|
||||
|
||||
msg.Reset()
|
||||
|
||||
if err := rd.ReadMsg(&msg); err != nil {
|
||||
s.Reset()
|
||||
return nil, fmt.Errorf("error reading reservation response message: %w", err)
|
||||
}
|
||||
|
||||
if msg.GetType() != pbv2.HopMessage_STATUS {
|
||||
return nil, fmt.Errorf("unexpected relay response: not a status message (%d)", msg.GetType())
|
||||
}
|
||||
|
||||
if status := msg.GetStatus(); status != pbv2.Status_OK {
|
||||
return nil, fmt.Errorf("reservation failed: %s (%d)", pbv2.Status_name[int32(status)], status)
|
||||
}
|
||||
|
||||
rsvp := msg.GetReservation()
|
||||
if rsvp == nil {
|
||||
return nil, fmt.Errorf("missing reservation info")
|
||||
}
|
||||
|
||||
result := &Reservation{}
|
||||
result.Expiration = time.Unix(int64(rsvp.GetExpire()), 0)
|
||||
|
||||
for _, ab := range rsvp.GetAddrs() {
|
||||
a, err := ma.NewMultiaddrBytes(ab)
|
||||
if err != nil {
|
||||
log.Warnf("ignoring unparsable relay address: %s", err)
|
||||
continue
|
||||
}
|
||||
result.Addrs = append(result.Addrs, a)
|
||||
}
|
||||
|
||||
voucherBytes := rsvp.GetVoucher()
|
||||
if voucherBytes != nil {
|
||||
_, rec, err := record.ConsumeEnvelope(voucherBytes, proto.RecordDomain)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error consuming voucher envelope: %w", err)
|
||||
}
|
||||
|
||||
voucher, ok := rec.(*proto.ReservationVoucher)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected voucher record type: %+T", rec)
|
||||
}
|
||||
|
||||
result.Voucher = voucher
|
||||
}
|
||||
|
||||
limit := msg.GetLimit()
|
||||
if limit != nil {
|
||||
result.LimitDuration = time.Duration(limit.GetDuration()) * time.Second
|
||||
result.LimitData = limit.GetData()
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/transport"
|
||||
|
||||
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
var circuitProtocol = ma.ProtocolWithCode(ma.P_CIRCUIT)
|
||||
var circuitAddr = ma.Cast(circuitProtocol.VCode)
|
||||
|
||||
// AddTransport constructs a new p2p-circuit/v2 client and adds it as a transport to the
|
||||
// host network
|
||||
func AddTransport(ctx context.Context, h host.Host, upgrader *tptu.Upgrader) error {
|
||||
n, ok := h.Network().(transport.TransportNetwork)
|
||||
if !ok {
|
||||
return fmt.Errorf("%v is not a transport network", h.Network())
|
||||
}
|
||||
|
||||
c, err := New(ctx, h, upgrader)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error constructing circuit client: %w", err)
|
||||
}
|
||||
|
||||
err = n.AddTransport(c)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error adding circuit transport: %w", err)
|
||||
}
|
||||
|
||||
err = n.Listen(circuitAddr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listening to circuit addr: %w", err)
|
||||
}
|
||||
|
||||
c.Start()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Transport interface
|
||||
var _ transport.Transport = (*Client)(nil)
|
||||
|
||||
func (c *Client) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
|
||||
conn, err := c.dial(ctx, a, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn.tagHop()
|
||||
|
||||
return c.upgrader.UpgradeOutbound(ctx, c, conn, p)
|
||||
}
|
||||
|
||||
func (c *Client) CanDial(addr ma.Multiaddr) bool {
|
||||
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (c *Client) Listen(addr ma.Multiaddr) (transport.Listener, error) {
|
||||
// TODO connect to the relay and reserve slot if specified
|
||||
if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c.upgrader.UpgradeListener(c, c.Listener()), nil
|
||||
}
|
||||
|
||||
func (c *Client) Protocols() []int {
|
||||
return []int{ma.P_CIRCUIT}
|
||||
}
|
||||
|
||||
func (c *Client) Proxy() bool {
|
||||
return true
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
PB = $(wildcard *.proto)
|
||||
GO = $(PB:.proto=.pb.go)
|
||||
|
||||
all: $(GO)
|
||||
|
||||
%.pb.go: %.proto
|
||||
protoc --gogofast_out=. $<
|
||||
|
||||
clean:
|
||||
rm -f *.pb.go
|
||||
rm -f *.go
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,60 @@
|
||||
syntax = "proto2";
|
||||
|
||||
package circuit.pb;
|
||||
|
||||
message HopMessage {
|
||||
enum Type {
|
||||
RESERVE = 0;
|
||||
CONNECT = 1;
|
||||
STATUS = 2;
|
||||
}
|
||||
|
||||
required Type type = 1;
|
||||
|
||||
optional Peer peer = 2;
|
||||
optional Reservation reservation = 3;
|
||||
optional Limit limit = 4;
|
||||
|
||||
optional Status status = 5;
|
||||
}
|
||||
|
||||
message StopMessage {
|
||||
enum Type {
|
||||
CONNECT = 0;
|
||||
STATUS = 1;
|
||||
}
|
||||
|
||||
required Type type = 1;
|
||||
|
||||
optional Peer peer = 2;
|
||||
optional Limit limit = 3;
|
||||
|
||||
optional Status status = 4;
|
||||
}
|
||||
|
||||
message Peer {
|
||||
required bytes id = 1;
|
||||
repeated bytes addrs = 2;
|
||||
}
|
||||
|
||||
message Reservation {
|
||||
optional uint64 expire = 1; // Unix expiration time (UTC)
|
||||
repeated bytes addrs = 2; // relay addrs for reserving peer
|
||||
optional bytes voucher = 3; // reservation voucher
|
||||
}
|
||||
|
||||
message Limit {
|
||||
optional uint32 duration = 1; // seconds
|
||||
optional uint64 data = 2; // bytes
|
||||
}
|
||||
|
||||
enum Status {
|
||||
OK = 100;
|
||||
RESERVATION_REFUSED = 200;
|
||||
RESOURCE_LIMIT_EXCEEDED = 201;
|
||||
PERMISSION_DENIED = 202;
|
||||
CONNECTION_FAILED = 203;
|
||||
NO_RESERVATION = 204;
|
||||
MALFORMED_MESSAGE = 400;
|
||||
UNEXPECTED_MESSAGE = 401;
|
||||
}
|
||||
@@ -0,0 +1,438 @@
|
||||
// Code generated by protoc-gen-gogo. DO NOT EDIT.
|
||||
// source: voucher.proto
|
||||
|
||||
package circuit_pb
|
||||
|
||||
import (
|
||||
fmt "fmt"
|
||||
github_com_gogo_protobuf_proto "github.com/gogo/protobuf/proto"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
io "io"
|
||||
math "math"
|
||||
math_bits "math/bits"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
|
||||
|
||||
type ReservationVoucher struct {
|
||||
Relay []byte `protobuf:"bytes,1,req,name=relay" json:"relay,omitempty"`
|
||||
Peer []byte `protobuf:"bytes,2,req,name=peer" json:"peer,omitempty"`
|
||||
Expiration *uint64 `protobuf:"varint,3,req,name=expiration" json:"expiration,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *ReservationVoucher) Reset() { *m = ReservationVoucher{} }
|
||||
func (m *ReservationVoucher) String() string { return proto.CompactTextString(m) }
|
||||
func (*ReservationVoucher) ProtoMessage() {}
|
||||
func (*ReservationVoucher) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_a22a9b0d3335ba25, []int{0}
|
||||
}
|
||||
func (m *ReservationVoucher) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *ReservationVoucher) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_ReservationVoucher.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *ReservationVoucher) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_ReservationVoucher.Merge(m, src)
|
||||
}
|
||||
func (m *ReservationVoucher) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *ReservationVoucher) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_ReservationVoucher.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_ReservationVoucher proto.InternalMessageInfo
|
||||
|
||||
func (m *ReservationVoucher) GetRelay() []byte {
|
||||
if m != nil {
|
||||
return m.Relay
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *ReservationVoucher) GetPeer() []byte {
|
||||
if m != nil {
|
||||
return m.Peer
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *ReservationVoucher) GetExpiration() uint64 {
|
||||
if m != nil && m.Expiration != nil {
|
||||
return *m.Expiration
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*ReservationVoucher)(nil), "circuit.pb.ReservationVoucher")
|
||||
}
|
||||
|
||||
func init() { proto.RegisterFile("voucher.proto", fileDescriptor_a22a9b0d3335ba25) }
|
||||
|
||||
var fileDescriptor_a22a9b0d3335ba25 = []byte{
|
||||
// 135 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2d, 0xcb, 0x2f, 0x4d,
|
||||
0xce, 0x48, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4a, 0xce, 0x2c, 0x4a, 0x2e,
|
||||
0xcd, 0x2c, 0xd1, 0x2b, 0x48, 0x52, 0x8a, 0xe3, 0x12, 0x0a, 0x4a, 0x2d, 0x4e, 0x2d, 0x2a, 0x4b,
|
||||
0x2c, 0xc9, 0xcc, 0xcf, 0x0b, 0x83, 0xa8, 0x13, 0x12, 0xe1, 0x62, 0x2d, 0x4a, 0xcd, 0x49, 0xac,
|
||||
0x94, 0x60, 0x54, 0x60, 0xd2, 0xe0, 0x09, 0x82, 0x70, 0x84, 0x84, 0xb8, 0x58, 0x0a, 0x52, 0x53,
|
||||
0x8b, 0x24, 0x98, 0xc0, 0x82, 0x60, 0xb6, 0x90, 0x1c, 0x17, 0x57, 0x6a, 0x45, 0x41, 0x66, 0x11,
|
||||
0x58, 0xbb, 0x04, 0xb3, 0x02, 0x93, 0x06, 0x4b, 0x10, 0x92, 0x88, 0x13, 0xcf, 0x89, 0x47, 0x72,
|
||||
0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x08, 0x08, 0x00, 0x00, 0xff, 0xff, 0xc0,
|
||||
0x81, 0x3a, 0xee, 0x89, 0x00, 0x00, 0x00,
|
||||
}
|
||||
|
||||
func (m *ReservationVoucher) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *ReservationVoucher) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *ReservationVoucher) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.XXX_unrecognized != nil {
|
||||
i -= len(m.XXX_unrecognized)
|
||||
copy(dAtA[i:], m.XXX_unrecognized)
|
||||
}
|
||||
if m.Expiration == nil {
|
||||
return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("expiration")
|
||||
} else {
|
||||
i = encodeVarintVoucher(dAtA, i, uint64(*m.Expiration))
|
||||
i--
|
||||
dAtA[i] = 0x18
|
||||
}
|
||||
if m.Peer == nil {
|
||||
return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("peer")
|
||||
} else {
|
||||
i -= len(m.Peer)
|
||||
copy(dAtA[i:], m.Peer)
|
||||
i = encodeVarintVoucher(dAtA, i, uint64(len(m.Peer)))
|
||||
i--
|
||||
dAtA[i] = 0x12
|
||||
}
|
||||
if m.Relay == nil {
|
||||
return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("relay")
|
||||
} else {
|
||||
i -= len(m.Relay)
|
||||
copy(dAtA[i:], m.Relay)
|
||||
i = encodeVarintVoucher(dAtA, i, uint64(len(m.Relay)))
|
||||
i--
|
||||
dAtA[i] = 0xa
|
||||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func encodeVarintVoucher(dAtA []byte, offset int, v uint64) int {
|
||||
offset -= sovVoucher(v)
|
||||
base := offset
|
||||
for v >= 1<<7 {
|
||||
dAtA[offset] = uint8(v&0x7f | 0x80)
|
||||
v >>= 7
|
||||
offset++
|
||||
}
|
||||
dAtA[offset] = uint8(v)
|
||||
return base
|
||||
}
|
||||
func (m *ReservationVoucher) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
if m.Relay != nil {
|
||||
l = len(m.Relay)
|
||||
n += 1 + l + sovVoucher(uint64(l))
|
||||
}
|
||||
if m.Peer != nil {
|
||||
l = len(m.Peer)
|
||||
n += 1 + l + sovVoucher(uint64(l))
|
||||
}
|
||||
if m.Expiration != nil {
|
||||
n += 1 + sovVoucher(uint64(*m.Expiration))
|
||||
}
|
||||
if m.XXX_unrecognized != nil {
|
||||
n += len(m.XXX_unrecognized)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func sovVoucher(x uint64) (n int) {
|
||||
return (math_bits.Len64(x|1) + 6) / 7
|
||||
}
|
||||
func sozVoucher(x uint64) (n int) {
|
||||
return sovVoucher(uint64((x << 1) ^ uint64((int64(x) >> 63))))
|
||||
}
|
||||
func (m *ReservationVoucher) Unmarshal(dAtA []byte) error {
|
||||
var hasFields [1]uint64
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowVoucher
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: ReservationVoucher: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: ReservationVoucher: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Relay", wireType)
|
||||
}
|
||||
var byteLen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowVoucher
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
byteLen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if byteLen < 0 {
|
||||
return ErrInvalidLengthVoucher
|
||||
}
|
||||
postIndex := iNdEx + byteLen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthVoucher
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.Relay = append(m.Relay[:0], dAtA[iNdEx:postIndex]...)
|
||||
if m.Relay == nil {
|
||||
m.Relay = []byte{}
|
||||
}
|
||||
iNdEx = postIndex
|
||||
hasFields[0] |= uint64(0x00000001)
|
||||
case 2:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Peer", wireType)
|
||||
}
|
||||
var byteLen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowVoucher
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
byteLen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if byteLen < 0 {
|
||||
return ErrInvalidLengthVoucher
|
||||
}
|
||||
postIndex := iNdEx + byteLen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthVoucher
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.Peer = append(m.Peer[:0], dAtA[iNdEx:postIndex]...)
|
||||
if m.Peer == nil {
|
||||
m.Peer = []byte{}
|
||||
}
|
||||
iNdEx = postIndex
|
||||
hasFields[0] |= uint64(0x00000002)
|
||||
case 3:
|
||||
if wireType != 0 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Expiration", wireType)
|
||||
}
|
||||
var v uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowVoucher
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
v |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
m.Expiration = &v
|
||||
hasFields[0] |= uint64(0x00000004)
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipVoucher(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if skippy < 0 {
|
||||
return ErrInvalidLengthVoucher
|
||||
}
|
||||
if (iNdEx + skippy) < 0 {
|
||||
return ErrInvalidLengthVoucher
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
if hasFields[0]&uint64(0x00000001) == 0 {
|
||||
return github_com_gogo_protobuf_proto.NewRequiredNotSetError("relay")
|
||||
}
|
||||
if hasFields[0]&uint64(0x00000002) == 0 {
|
||||
return github_com_gogo_protobuf_proto.NewRequiredNotSetError("peer")
|
||||
}
|
||||
if hasFields[0]&uint64(0x00000004) == 0 {
|
||||
return github_com_gogo_protobuf_proto.NewRequiredNotSetError("expiration")
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func skipVoucher(dAtA []byte) (n int, err error) {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
depth := 0
|
||||
for iNdEx < l {
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowVoucher
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
wireType := int(wire & 0x7)
|
||||
switch wireType {
|
||||
case 0:
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowVoucher
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx++
|
||||
if dAtA[iNdEx-1] < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
case 1:
|
||||
iNdEx += 8
|
||||
case 2:
|
||||
var length int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowVoucher
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
length |= (int(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if length < 0 {
|
||||
return 0, ErrInvalidLengthVoucher
|
||||
}
|
||||
iNdEx += length
|
||||
case 3:
|
||||
depth++
|
||||
case 4:
|
||||
if depth == 0 {
|
||||
return 0, ErrUnexpectedEndOfGroupVoucher
|
||||
}
|
||||
depth--
|
||||
case 5:
|
||||
iNdEx += 4
|
||||
default:
|
||||
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
|
||||
}
|
||||
if iNdEx < 0 {
|
||||
return 0, ErrInvalidLengthVoucher
|
||||
}
|
||||
if depth == 0 {
|
||||
return iNdEx, nil
|
||||
}
|
||||
}
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
var (
|
||||
ErrInvalidLengthVoucher = fmt.Errorf("proto: negative length found during unmarshaling")
|
||||
ErrIntOverflowVoucher = fmt.Errorf("proto: integer overflow")
|
||||
ErrUnexpectedEndOfGroupVoucher = fmt.Errorf("proto: unexpected end of group")
|
||||
)
|
||||
@@ -0,0 +1,9 @@
|
||||
syntax = "proto2";
|
||||
|
||||
package circuit.pb;
|
||||
|
||||
message ReservationVoucher {
|
||||
required bytes relay = 1;
|
||||
required bytes peer = 2;
|
||||
required uint64 expiration = 3;
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package proto
|
||||
|
||||
const (
|
||||
ProtoIDv1 = "/libp2p/circuit/relay/0.1.0"
|
||||
ProtoIDv2Hop = "/libp2p/circuit/relay/0.2.0/hop"
|
||||
ProtoIDv2Stop = "/libp2p/circuit/relay/0.2.0/stop"
|
||||
)
|
||||
@@ -0,0 +1,72 @@
|
||||
package proto
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/record"
|
||||
)
|
||||
|
||||
const RecordDomain = "libp2p-relay-rsvp"
|
||||
|
||||
// TODO: register in multicodec table in https://github.com/multiformats/multicodec
|
||||
var RecordCodec = []byte{0x03, 0x02}
|
||||
|
||||
func init() {
|
||||
record.RegisterType(&ReservationVoucher{})
|
||||
}
|
||||
|
||||
type ReservationVoucher struct {
|
||||
// Relay is the ID of the peer providing relay service
|
||||
Relay peer.ID
|
||||
// Peer is the ID of the peer receiving relay service through Relay
|
||||
Peer peer.ID
|
||||
// Expiration is the expiration time of the reservation
|
||||
Expiration time.Time
|
||||
}
|
||||
|
||||
var _ record.Record = (*ReservationVoucher)(nil)
|
||||
|
||||
func (rv *ReservationVoucher) Domain() string {
|
||||
return RecordDomain
|
||||
}
|
||||
|
||||
func (rv *ReservationVoucher) Codec() []byte {
|
||||
return RecordCodec
|
||||
}
|
||||
|
||||
func (rv *ReservationVoucher) MarshalRecord() ([]byte, error) {
|
||||
relay := []byte(rv.Relay)
|
||||
peer := []byte(rv.Peer)
|
||||
expiration := uint64(rv.Expiration.Unix())
|
||||
pbrv := &pbv2.ReservationVoucher{
|
||||
Relay: relay,
|
||||
Peer: peer,
|
||||
Expiration: &expiration,
|
||||
}
|
||||
|
||||
return pbrv.Marshal()
|
||||
}
|
||||
|
||||
func (rv *ReservationVoucher) UnmarshalRecord(blob []byte) error {
|
||||
pbrv := pbv2.ReservationVoucher{}
|
||||
err := pbrv.Unmarshal(blob)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rv.Relay, err = peer.IDFromBytes(pbrv.GetRelay())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rv.Peer, err = peer.IDFromBytes(pbrv.GetPeer())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rv.Expiration = time.Unix(int64(pbrv.GetExpiration()), 0)
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
package proto
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/record"
|
||||
)
|
||||
|
||||
func TestReservationVoucher(t *testing.T) {
|
||||
relayPrivk, relayPubk, err := crypto.GenerateKeyPair(crypto.Ed25519, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, peerPubk, err := crypto.GenerateKeyPair(crypto.Ed25519, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
relayID, err := peer.IDFromPublicKey(relayPubk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
peerID, err := peer.IDFromPublicKey(peerPubk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rsvp := &ReservationVoucher{
|
||||
Relay: relayID,
|
||||
Peer: peerID,
|
||||
Expiration: time.Now().Add(time.Hour),
|
||||
}
|
||||
|
||||
envelope, err := record.Seal(rsvp, relayPrivk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
blob, err := envelope.Marshal()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, rec, err := record.ConsumeEnvelope(blob, RecordDomain)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rsvp2, ok := rec.(*ReservationVoucher)
|
||||
if !ok {
|
||||
t.Fatalf("invalid record type %+T", rec)
|
||||
}
|
||||
|
||||
if rsvp.Relay != rsvp2.Relay {
|
||||
t.Fatal("relay IDs don't match")
|
||||
}
|
||||
if rsvp.Peer != rsvp2.Peer {
|
||||
t.Fatal("peer IDs don't match")
|
||||
}
|
||||
if rsvp.Expiration.Unix() != rsvp2.Expiration.Unix() {
|
||||
t.Fatal("expirations don't match")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
package relay
|
||||
|
||||
import (
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
// ACLFilter is an Access Control mechanism for relayed connect.
|
||||
type ACLFilter interface {
|
||||
// AllowReserve returns true if a reservation from a peer with the given peer ID and multiaddr
|
||||
// is allowed.
|
||||
AllowReserve(p peer.ID, a ma.Multiaddr) bool
|
||||
// AllowConnect returns true if a source peer, with a given multiaddr is allowed to connect
|
||||
// to a destination peer.
|
||||
AllowConnect(src peer.ID, srcAddr ma.Multiaddr, dest peer.ID) bool
|
||||
}
|
||||
@@ -0,0 +1,124 @@
|
||||
package relay
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
asnutil "github.com/libp2p/go-libp2p-asn-util"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
manet "github.com/multiformats/go-multiaddr/net"
|
||||
)
|
||||
|
||||
var validity = 30 * time.Minute
|
||||
|
||||
var (
|
||||
errTooManyReservations = errors.New("too many reservations")
|
||||
errTooManyReservationsForPeer = errors.New("too many reservations for peer")
|
||||
errTooManyReservationsForIP = errors.New("too many peers for IP address")
|
||||
errTooManyReservationsForASN = errors.New("too many peers for ASN")
|
||||
)
|
||||
|
||||
// constraints implements various reservation constraints
|
||||
type constraints struct {
|
||||
rc *Resources
|
||||
|
||||
mutex sync.Mutex
|
||||
total []time.Time
|
||||
peers map[peer.ID][]time.Time
|
||||
ips map[string][]time.Time
|
||||
asns map[string][]time.Time
|
||||
}
|
||||
|
||||
// newConstraints creates a new constraints object.
|
||||
// The methods are *not* thread-safe; an external lock must be held if synchronization
|
||||
// is required.
|
||||
func newConstraints(rc *Resources) *constraints {
|
||||
return &constraints{
|
||||
rc: rc,
|
||||
peers: make(map[peer.ID][]time.Time),
|
||||
ips: make(map[string][]time.Time),
|
||||
asns: make(map[string][]time.Time),
|
||||
}
|
||||
}
|
||||
|
||||
// AddReservation adds a reservation for a given peer with a given multiaddr.
|
||||
// If adding this reservation violates IP constraints, an error is returned.
|
||||
func (c *constraints) AddReservation(p peer.ID, a ma.Multiaddr) error {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
c.cleanup(now)
|
||||
|
||||
if len(c.total) >= c.rc.MaxReservations {
|
||||
return errTooManyReservations
|
||||
}
|
||||
|
||||
ip, err := manet.ToIP(a)
|
||||
if err != nil {
|
||||
return errors.New("no IP address associated with peer")
|
||||
}
|
||||
|
||||
peerReservations := c.peers[p]
|
||||
if len(peerReservations) >= c.rc.MaxReservationsPerPeer {
|
||||
return errTooManyReservationsForPeer
|
||||
}
|
||||
|
||||
ipReservations := c.ips[ip.String()]
|
||||
if len(ipReservations) >= c.rc.MaxReservationsPerIP {
|
||||
return errTooManyReservationsForIP
|
||||
}
|
||||
|
||||
var asnReservations []time.Time
|
||||
var asn string
|
||||
if ip.To4() == nil {
|
||||
asn, _ = asnutil.Store.AsnForIPv6(ip)
|
||||
if asn != "" {
|
||||
asnReservations = c.asns[asn]
|
||||
if len(asnReservations) >= c.rc.MaxReservationsPerASN {
|
||||
return errTooManyReservationsForASN
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
expiry := now.Add(validity)
|
||||
c.total = append(c.total, expiry)
|
||||
|
||||
peerReservations = append(peerReservations, expiry)
|
||||
c.peers[p] = peerReservations
|
||||
|
||||
ipReservations = append(ipReservations, expiry)
|
||||
c.ips[ip.String()] = ipReservations
|
||||
|
||||
if asn != "" {
|
||||
asnReservations = append(asnReservations, expiry)
|
||||
c.asns[asn] = asnReservations
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *constraints) cleanupList(l []time.Time, now time.Time) []time.Time {
|
||||
var index int
|
||||
for i, t := range l {
|
||||
if t.After(now) {
|
||||
break
|
||||
}
|
||||
index = i + 1
|
||||
}
|
||||
return l[index:]
|
||||
}
|
||||
|
||||
func (c *constraints) cleanup(now time.Time) {
|
||||
c.total = c.cleanupList(c.total, now)
|
||||
for k, peerReservations := range c.peers {
|
||||
c.peers[k] = c.cleanupList(peerReservations, now)
|
||||
}
|
||||
for k, ipReservations := range c.ips {
|
||||
c.ips[k] = c.cleanupList(ipReservations, now)
|
||||
}
|
||||
for k, asnReservations := range c.asns {
|
||||
c.asns[k] = c.cleanupList(asnReservations, now)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,142 @@
|
||||
package relay
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"math"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/test"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
func randomIPv4Addr(t *testing.T) ma.Multiaddr {
|
||||
t.Helper()
|
||||
b := make([]byte, 4)
|
||||
rand.Read(b)
|
||||
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/1234", net.IP(b)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return addr
|
||||
}
|
||||
|
||||
func TestConstraints(t *testing.T) {
|
||||
infResources := func() *Resources {
|
||||
return &Resources{
|
||||
MaxReservations: math.MaxInt32,
|
||||
MaxReservationsPerPeer: math.MaxInt32,
|
||||
MaxReservationsPerIP: math.MaxInt32,
|
||||
MaxReservationsPerASN: math.MaxInt32,
|
||||
}
|
||||
}
|
||||
const limit = 7
|
||||
|
||||
t.Run("total reservations", func(t *testing.T) {
|
||||
res := infResources()
|
||||
res.MaxReservations = limit
|
||||
c := newConstraints(res)
|
||||
for i := 0; i < limit; i++ {
|
||||
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != errTooManyReservations {
|
||||
t.Fatalf("expected to run into total reservation limit, got %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("reservations per peer", func(t *testing.T) {
|
||||
p := test.RandPeerIDFatal(t)
|
||||
res := infResources()
|
||||
res.MaxReservationsPerPeer = limit
|
||||
c := newConstraints(res)
|
||||
for i := 0; i < limit; i++ {
|
||||
if err := c.AddReservation(p, randomIPv4Addr(t)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
if err := c.AddReservation(p, randomIPv4Addr(t)); err != errTooManyReservationsForPeer {
|
||||
t.Fatalf("expected to run into total reservation limit, got %v", err)
|
||||
}
|
||||
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
|
||||
t.Fatalf("expected reservation for different peer to be possible, got %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("reservations per IP", func(t *testing.T) {
|
||||
ip := randomIPv4Addr(t)
|
||||
res := infResources()
|
||||
res.MaxReservationsPerIP = limit
|
||||
c := newConstraints(res)
|
||||
for i := 0; i < limit; i++ {
|
||||
if err := c.AddReservation(test.RandPeerIDFatal(t), ip); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
if err := c.AddReservation(test.RandPeerIDFatal(t), ip); err != errTooManyReservationsForIP {
|
||||
t.Fatalf("expected to run into total reservation limit, got %v", err)
|
||||
}
|
||||
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
|
||||
t.Fatalf("expected reservation for different IP to be possible, got %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("reservations per ASN", func(t *testing.T) {
|
||||
getAddr := func(t *testing.T, ip net.IP) ma.Multiaddr {
|
||||
t.Helper()
|
||||
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip6/%s/tcp/1234", ip))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return addr
|
||||
}
|
||||
|
||||
res := infResources()
|
||||
res.MaxReservationsPerASN = limit
|
||||
c := newConstraints(res)
|
||||
const ipv6Prefix = "2a03:2880:f003:c07:face:b00c::"
|
||||
for i := 0; i < limit; i++ {
|
||||
addr := getAddr(t, net.ParseIP(fmt.Sprintf("%s%d", ipv6Prefix, i+1)))
|
||||
if err := c.AddReservation(test.RandPeerIDFatal(t), addr); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
if err := c.AddReservation(test.RandPeerIDFatal(t), getAddr(t, net.ParseIP(fmt.Sprintf("%s%d", ipv6Prefix, 42)))); err != errTooManyReservationsForASN {
|
||||
t.Fatalf("expected to run into total reservation limit, got %v", err)
|
||||
}
|
||||
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
|
||||
t.Fatalf("expected reservation for different IP to be possible, got %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestConstraintsCleanup(t *testing.T) {
|
||||
origValidity := validity
|
||||
defer func() { validity = origValidity }()
|
||||
validity = 500 * time.Millisecond
|
||||
|
||||
const limit = 7
|
||||
res := &Resources{
|
||||
MaxReservations: limit,
|
||||
MaxReservationsPerPeer: math.MaxInt32,
|
||||
MaxReservationsPerIP: math.MaxInt32,
|
||||
MaxReservationsPerASN: math.MaxInt32,
|
||||
}
|
||||
c := newConstraints(res)
|
||||
for i := 0; i < limit; i++ {
|
||||
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != errTooManyReservations {
|
||||
t.Fatalf("expected to run into total reservation limit, got %v", err)
|
||||
}
|
||||
|
||||
time.Sleep(validity + time.Millisecond)
|
||||
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
|
||||
t.Fatalf("expected old reservations to have been garbage collected, %v", err)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package relay
|
||||
|
||||
type Option func(*Relay) error
|
||||
|
||||
// WithResources is a Relay option that sets specific relay resources for the relay.
|
||||
func WithResources(rc Resources) Option {
|
||||
return func(r *Relay) error {
|
||||
r.rc = rc
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithLimit is a Relay option that sets only the relayed connection limits for the relay.
|
||||
func WithLimit(limit *RelayLimit) Option {
|
||||
return func(r *Relay) error {
|
||||
r.rc.Limit = limit
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithACL is a Relay option that supplies an ACLFilter for access control.
|
||||
func WithACL(acl ACLFilter) Option {
|
||||
return func(r *Relay) error {
|
||||
r.acl = acl
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,510 @@
|
||||
package relay
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/record"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
pool "github.com/libp2p/go-buffer-pool"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
manet "github.com/multiformats/go-multiaddr/net"
|
||||
)
|
||||
|
||||
const (
|
||||
ReservationTagWeight = 10
|
||||
|
||||
StreamTimeout = time.Minute
|
||||
ConnectTimeout = 30 * time.Second
|
||||
HandshakeTimeout = time.Minute
|
||||
|
||||
maxMessageSize = 4096
|
||||
)
|
||||
|
||||
var log = logging.Logger("relay")
|
||||
|
||||
// Relay is the (limited) relay service object.
|
||||
type Relay struct {
|
||||
closed uint32
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
||||
host host.Host
|
||||
rc Resources
|
||||
acl ACLFilter
|
||||
constraints *constraints
|
||||
|
||||
mx sync.Mutex
|
||||
rsvp map[peer.ID]time.Time
|
||||
conns map[peer.ID]int
|
||||
|
||||
selfAddr ma.Multiaddr
|
||||
}
|
||||
|
||||
// New constructs a new limited relay that can provide relay services in the given host.
|
||||
func New(h host.Host, opts ...Option) (*Relay, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
r := &Relay{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
host: h,
|
||||
rc: DefaultResources(),
|
||||
acl: nil,
|
||||
rsvp: make(map[peer.ID]time.Time),
|
||||
conns: make(map[peer.ID]int),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
err := opt(r)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error applying relay option: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
r.constraints = newConstraints(&r.rc)
|
||||
r.selfAddr = ma.StringCast(fmt.Sprintf("/p2p/%s", h.ID()))
|
||||
|
||||
h.SetStreamHandler(proto.ProtoIDv2Hop, r.handleStream)
|
||||
h.Network().Notify(
|
||||
&network.NotifyBundle{
|
||||
DisconnectedF: r.disconnected,
|
||||
})
|
||||
go r.background()
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (r *Relay) Close() error {
|
||||
if atomic.CompareAndSwapUint32(&r.closed, 0, 1) {
|
||||
r.host.RemoveStreamHandler(proto.ProtoIDv2Hop)
|
||||
r.cancel()
|
||||
r.mx.Lock()
|
||||
for p := range r.rsvp {
|
||||
r.host.ConnManager().UntagPeer(p, "relay-reservation")
|
||||
}
|
||||
r.mx.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Relay) handleStream(s network.Stream) {
|
||||
s.SetReadDeadline(time.Now().Add(StreamTimeout))
|
||||
|
||||
log.Infof("new relay stream from: %s", s.Conn().RemotePeer())
|
||||
|
||||
rd := util.NewDelimitedReader(s, maxMessageSize)
|
||||
defer rd.Close()
|
||||
|
||||
var msg pbv2.HopMessage
|
||||
|
||||
err := rd.ReadMsg(&msg)
|
||||
if err != nil {
|
||||
r.handleError(s, pbv2.Status_MALFORMED_MESSAGE)
|
||||
return
|
||||
}
|
||||
// reset stream deadline as message has been read
|
||||
s.SetReadDeadline(time.Time{})
|
||||
|
||||
switch msg.GetType() {
|
||||
case pbv2.HopMessage_RESERVE:
|
||||
r.handleReserve(s)
|
||||
|
||||
case pbv2.HopMessage_CONNECT:
|
||||
r.handleConnect(s, &msg)
|
||||
|
||||
default:
|
||||
r.handleError(s, pbv2.Status_MALFORMED_MESSAGE)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Relay) handleReserve(s network.Stream) {
|
||||
defer s.Close()
|
||||
|
||||
p := s.Conn().RemotePeer()
|
||||
a := s.Conn().RemoteMultiaddr()
|
||||
|
||||
if util.IsRelayAddr(a) {
|
||||
log.Debugf("refusing relay reservation for %s; reservation attempt over relay connection")
|
||||
r.handleError(s, pbv2.Status_PERMISSION_DENIED)
|
||||
return
|
||||
}
|
||||
|
||||
if r.acl != nil && !r.acl.AllowReserve(p, a) {
|
||||
log.Debugf("refusing relay reservation for %s; permission denied", p)
|
||||
r.handleError(s, pbv2.Status_PERMISSION_DENIED)
|
||||
return
|
||||
}
|
||||
|
||||
r.mx.Lock()
|
||||
now := time.Now()
|
||||
|
||||
_, exists := r.rsvp[p]
|
||||
if !exists {
|
||||
if err := r.constraints.AddReservation(p, a); err != nil {
|
||||
r.mx.Unlock()
|
||||
log.Debugf("refusing relay reservation for %s; IP constraint violation: %s", p, err)
|
||||
r.handleError(s, pbv2.Status_RESERVATION_REFUSED)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
expire := now.Add(r.rc.ReservationTTL)
|
||||
r.rsvp[p] = expire
|
||||
r.host.ConnManager().TagPeer(p, "relay-reservation", ReservationTagWeight)
|
||||
r.mx.Unlock()
|
||||
|
||||
log.Debugf("reserving relay slot for %s", p)
|
||||
|
||||
// Delivery of the reservation might fail for a number of reasons.
|
||||
// For example, the stream might be reset or the connection might be closed before the reservation is received.
|
||||
// In that case, the reservation will just be garbage collected later.
|
||||
if err := r.writeResponse(s, pbv2.Status_OK, r.makeReservationMsg(p, expire), r.makeLimitMsg(p)); err != nil {
|
||||
log.Debugf("error writing reservation response; retracting reservation for %s", p)
|
||||
s.Reset()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Relay) handleConnect(s network.Stream, msg *pbv2.HopMessage) {
|
||||
src := s.Conn().RemotePeer()
|
||||
a := s.Conn().RemoteMultiaddr()
|
||||
|
||||
if util.IsRelayAddr(a) {
|
||||
log.Debugf("refusing connection from %s; connection attempt over relay connection")
|
||||
r.handleError(s, pbv2.Status_PERMISSION_DENIED)
|
||||
return
|
||||
}
|
||||
|
||||
dest, err := util.PeerToPeerInfoV2(msg.GetPeer())
|
||||
if err != nil {
|
||||
r.handleError(s, pbv2.Status_MALFORMED_MESSAGE)
|
||||
return
|
||||
}
|
||||
|
||||
if r.acl != nil && !r.acl.AllowConnect(src, s.Conn().RemoteMultiaddr(), dest.ID) {
|
||||
log.Debugf("refusing connection from %s to %s; permission denied", src, dest.ID)
|
||||
r.handleError(s, pbv2.Status_PERMISSION_DENIED)
|
||||
return
|
||||
}
|
||||
|
||||
r.mx.Lock()
|
||||
_, rsvp := r.rsvp[dest.ID]
|
||||
if !rsvp {
|
||||
r.mx.Unlock()
|
||||
log.Debugf("refusing connection from %s to %s; no reservation", src, dest.ID)
|
||||
r.handleError(s, pbv2.Status_NO_RESERVATION)
|
||||
return
|
||||
}
|
||||
|
||||
srcConns := r.conns[src]
|
||||
if srcConns >= r.rc.MaxCircuits {
|
||||
r.mx.Unlock()
|
||||
log.Debugf("refusing connection from %s to %s; too many connections from %s", src, dest.ID, src)
|
||||
r.handleError(s, pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
|
||||
return
|
||||
}
|
||||
r.conns[src]++
|
||||
|
||||
destConns := r.conns[dest.ID]
|
||||
if destConns >= r.rc.MaxCircuits {
|
||||
r.conns[src]--
|
||||
r.mx.Unlock()
|
||||
log.Debugf("refusing connection from %s to %s; too many connecitons to %s", src, dest.ID, dest.ID)
|
||||
r.handleError(s, pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
|
||||
return
|
||||
}
|
||||
r.conns[dest.ID]++
|
||||
r.mx.Unlock()
|
||||
|
||||
cleanup := func() {
|
||||
r.mx.Lock()
|
||||
r.conns[src]--
|
||||
r.conns[dest.ID]--
|
||||
r.mx.Unlock()
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.ctx, ConnectTimeout)
|
||||
defer cancel()
|
||||
|
||||
ctx = network.WithNoDial(ctx, "relay connect")
|
||||
|
||||
bs, err := r.host.NewStream(ctx, dest.ID, proto.ProtoIDv2Stop)
|
||||
if err != nil {
|
||||
log.Debugf("error opening relay stream to %s: %s", dest.ID, err)
|
||||
cleanup()
|
||||
r.handleError(s, pbv2.Status_CONNECTION_FAILED)
|
||||
return
|
||||
}
|
||||
|
||||
// handshake
|
||||
rd := util.NewDelimitedReader(bs, maxMessageSize)
|
||||
wr := util.NewDelimitedWriter(bs)
|
||||
defer rd.Close()
|
||||
|
||||
var stopmsg pbv2.StopMessage
|
||||
stopmsg.Type = pbv2.StopMessage_CONNECT.Enum()
|
||||
stopmsg.Peer = util.PeerInfoToPeerV2(peer.AddrInfo{ID: src})
|
||||
stopmsg.Limit = r.makeLimitMsg(dest.ID)
|
||||
|
||||
bs.SetDeadline(time.Now().Add(HandshakeTimeout))
|
||||
|
||||
err = wr.WriteMsg(&stopmsg)
|
||||
if err != nil {
|
||||
log.Debugf("error writing stop handshake")
|
||||
bs.Reset()
|
||||
cleanup()
|
||||
r.handleError(s, pbv2.Status_CONNECTION_FAILED)
|
||||
return
|
||||
}
|
||||
|
||||
stopmsg.Reset()
|
||||
|
||||
err = rd.ReadMsg(&stopmsg)
|
||||
if err != nil {
|
||||
log.Debugf("error reading stop response: %s", err.Error())
|
||||
bs.Reset()
|
||||
cleanup()
|
||||
r.handleError(s, pbv2.Status_CONNECTION_FAILED)
|
||||
return
|
||||
}
|
||||
|
||||
if t := stopmsg.GetType(); t != pbv2.StopMessage_STATUS {
|
||||
log.Debugf("unexpected stop response; not a status message (%d)", t)
|
||||
bs.Reset()
|
||||
cleanup()
|
||||
r.handleError(s, pbv2.Status_CONNECTION_FAILED)
|
||||
return
|
||||
}
|
||||
|
||||
if status := stopmsg.GetStatus(); status != pbv2.Status_OK {
|
||||
log.Debugf("relay stop failure: %d", status)
|
||||
bs.Reset()
|
||||
cleanup()
|
||||
r.handleError(s, pbv2.Status_CONNECTION_FAILED)
|
||||
return
|
||||
}
|
||||
|
||||
var response pbv2.HopMessage
|
||||
response.Type = pbv2.HopMessage_STATUS.Enum()
|
||||
response.Status = pbv2.Status_OK.Enum()
|
||||
response.Limit = r.makeLimitMsg(dest.ID)
|
||||
|
||||
wr = util.NewDelimitedWriter(s)
|
||||
err = wr.WriteMsg(&response)
|
||||
if err != nil {
|
||||
log.Debugf("error writing relay response: %s", err)
|
||||
bs.Reset()
|
||||
s.Reset()
|
||||
cleanup()
|
||||
return
|
||||
}
|
||||
|
||||
// reset deadline
|
||||
bs.SetDeadline(time.Time{})
|
||||
|
||||
log.Infof("relaying connection from %s to %s", src, dest.ID)
|
||||
|
||||
goroutines := new(int32)
|
||||
*goroutines = 2
|
||||
|
||||
done := func() {
|
||||
if atomic.AddInt32(goroutines, -1) == 0 {
|
||||
s.Close()
|
||||
bs.Close()
|
||||
cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
if r.rc.Limit != nil {
|
||||
deadline := time.Now().Add(r.rc.Limit.Duration)
|
||||
s.SetDeadline(deadline)
|
||||
bs.SetDeadline(deadline)
|
||||
go r.relayLimited(s, bs, src, dest.ID, r.rc.Limit.Data, done)
|
||||
go r.relayLimited(bs, s, dest.ID, src, r.rc.Limit.Data, done)
|
||||
} else {
|
||||
go r.relayUnlimited(s, bs, src, dest.ID, done)
|
||||
go r.relayUnlimited(bs, s, dest.ID, src, done)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Relay) relayLimited(src, dest network.Stream, srcID, destID peer.ID, limit int64, done func()) {
|
||||
defer done()
|
||||
|
||||
buf := pool.Get(r.rc.BufferSize)
|
||||
defer pool.Put(buf)
|
||||
|
||||
limitedSrc := io.LimitReader(src, limit)
|
||||
|
||||
count, err := io.CopyBuffer(dest, limitedSrc, buf)
|
||||
if err != nil {
|
||||
log.Debugf("relay copy error: %s", err)
|
||||
// Reset both.
|
||||
src.Reset()
|
||||
dest.Reset()
|
||||
} else {
|
||||
// propagate the close
|
||||
dest.CloseWrite()
|
||||
if count == limit {
|
||||
// we've reached the limit, discard further input
|
||||
src.CloseRead()
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("relayed %d bytes from %s to %s", count, srcID, destID)
|
||||
}
|
||||
|
||||
func (r *Relay) relayUnlimited(src, dest network.Stream, srcID, destID peer.ID, done func()) {
|
||||
defer done()
|
||||
|
||||
buf := pool.Get(r.rc.BufferSize)
|
||||
defer pool.Put(buf)
|
||||
|
||||
count, err := io.CopyBuffer(dest, src, buf)
|
||||
if err != nil {
|
||||
log.Debugf("relay copy error: %s", err)
|
||||
// Reset both.
|
||||
src.Reset()
|
||||
dest.Reset()
|
||||
} else {
|
||||
// propagate the close
|
||||
dest.CloseWrite()
|
||||
}
|
||||
|
||||
log.Debugf("relayed %d bytes from %s to %s", count, srcID, destID)
|
||||
}
|
||||
|
||||
func (r *Relay) handleError(s network.Stream, status pbv2.Status) {
|
||||
log.Debugf("relay error: %s (%d)", pbv2.Status_name[int32(status)], status)
|
||||
err := r.writeResponse(s, status, nil, nil)
|
||||
if err != nil {
|
||||
s.Reset()
|
||||
log.Debugf("error writing relay response: %s", err.Error())
|
||||
} else {
|
||||
s.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Relay) writeResponse(s network.Stream, status pbv2.Status, rsvp *pbv2.Reservation, limit *pbv2.Limit) error {
|
||||
wr := util.NewDelimitedWriter(s)
|
||||
|
||||
var msg pbv2.HopMessage
|
||||
msg.Type = pbv2.HopMessage_STATUS.Enum()
|
||||
msg.Status = status.Enum()
|
||||
msg.Reservation = rsvp
|
||||
msg.Limit = limit
|
||||
|
||||
return wr.WriteMsg(&msg)
|
||||
}
|
||||
|
||||
func (r *Relay) makeReservationMsg(p peer.ID, expire time.Time) *pbv2.Reservation {
|
||||
expireUnix := uint64(expire.Unix())
|
||||
|
||||
var addrBytes [][]byte
|
||||
for _, addr := range r.host.Addrs() {
|
||||
if !manet.IsPublicAddr(addr) {
|
||||
continue
|
||||
}
|
||||
|
||||
addr = addr.Encapsulate(r.selfAddr)
|
||||
addrBytes = append(addrBytes, addr.Bytes())
|
||||
}
|
||||
|
||||
rsvp := &pbv2.Reservation{
|
||||
Expire: &expireUnix,
|
||||
Addrs: addrBytes,
|
||||
}
|
||||
|
||||
voucher := &proto.ReservationVoucher{
|
||||
Relay: r.host.ID(),
|
||||
Peer: p,
|
||||
Expiration: expire,
|
||||
}
|
||||
|
||||
envelope, err := record.Seal(voucher, r.host.Peerstore().PrivKey(r.host.ID()))
|
||||
if err != nil {
|
||||
log.Errorf("error sealing voucher for %s: %s", p, err)
|
||||
return rsvp
|
||||
}
|
||||
|
||||
blob, err := envelope.Marshal()
|
||||
if err != nil {
|
||||
log.Errorf("error marshalling voucher for %s: %s", p, err)
|
||||
return rsvp
|
||||
}
|
||||
|
||||
rsvp.Voucher = blob
|
||||
|
||||
return rsvp
|
||||
}
|
||||
|
||||
func (r *Relay) makeLimitMsg(p peer.ID) *pbv2.Limit {
|
||||
if r.rc.Limit == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
duration := uint32(r.rc.Limit.Duration / time.Second)
|
||||
data := uint64(r.rc.Limit.Data)
|
||||
|
||||
return &pbv2.Limit{
|
||||
Duration: &duration,
|
||||
Data: &data,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Relay) background() {
|
||||
ticker := time.NewTicker(time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
r.gc()
|
||||
case <-r.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Relay) gc() {
|
||||
r.mx.Lock()
|
||||
defer r.mx.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
|
||||
for p, expire := range r.rsvp {
|
||||
if expire.Before(now) {
|
||||
delete(r.rsvp, p)
|
||||
r.host.ConnManager().UntagPeer(p, "relay-reservation")
|
||||
}
|
||||
}
|
||||
|
||||
for p, count := range r.conns {
|
||||
if count == 0 {
|
||||
delete(r.conns, p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Relay) disconnected(n network.Network, c network.Conn) {
|
||||
p := c.RemotePeer()
|
||||
if n.Connectedness(p) == network.Connected {
|
||||
return
|
||||
}
|
||||
|
||||
r.mx.Lock()
|
||||
defer r.mx.Unlock()
|
||||
|
||||
delete(r.rsvp, p)
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
package relay
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Resources are the resource limits associated with the relay service.
|
||||
type Resources struct {
|
||||
// Limit is the (optional) relayed connection limits.
|
||||
Limit *RelayLimit
|
||||
|
||||
// ReservationTTL is the duration of a new (or refreshed reservation).
|
||||
// Defaults to 1hr.
|
||||
ReservationTTL time.Duration
|
||||
|
||||
// MaxReservations is the maximum number of active relay slots; defaults to 128.
|
||||
MaxReservations int
|
||||
// MaxCircuits is the maximum number of open relay connections for each peer; defaults to 16.
|
||||
MaxCircuits int
|
||||
// BufferSize is the size of the relayed connection buffers; defaults to 2048.
|
||||
BufferSize int
|
||||
|
||||
// MaxReservationsPerPeer is the maximum number of reservations originating from the same
|
||||
// peer; default is 4.
|
||||
MaxReservationsPerPeer int
|
||||
// MaxReservationsPerIP is the maximum number of reservations originating from the same
|
||||
// IP address; default is 8.
|
||||
MaxReservationsPerIP int
|
||||
// MaxReservationsPerASN is the maximum number of reservations origination from the same
|
||||
// ASN; default is 32
|
||||
MaxReservationsPerASN int
|
||||
}
|
||||
|
||||
// RelayLimit are the per relayed connection resource limits.
|
||||
type RelayLimit struct {
|
||||
// Duration is the time limit before resetting a relayed connection; defaults to 2min.
|
||||
Duration time.Duration
|
||||
// Data is the limit of data relayed (on each direction) before resetting the connection.
|
||||
// Defaults to 128KB
|
||||
Data int64
|
||||
}
|
||||
|
||||
// DefaultResources returns a Resources object with the default filled in.
|
||||
func DefaultResources() Resources {
|
||||
return Resources{
|
||||
Limit: DefaultLimit(),
|
||||
|
||||
ReservationTTL: time.Hour,
|
||||
|
||||
MaxReservations: 128,
|
||||
MaxCircuits: 16,
|
||||
BufferSize: 2048,
|
||||
|
||||
MaxReservationsPerPeer: 4,
|
||||
MaxReservationsPerIP: 8,
|
||||
MaxReservationsPerASN: 32,
|
||||
}
|
||||
}
|
||||
|
||||
// DefaultLimit returns a RelayLimit object with the defaults filled in.
|
||||
func DefaultLimit() *RelayLimit {
|
||||
return &RelayLimit{
|
||||
Duration: 2 * time.Minute,
|
||||
Data: 1 << 17, // 128K
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,177 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
v1 "github.com/libp2p/go-libp2p-circuit"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
|
||||
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
func addTransportV1(t *testing.T, ctx context.Context, h host.Host, upgrader *tptu.Upgrader) {
|
||||
err := v1.AddRelayTransport(ctx, h, upgrader)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRelayCompatV2DialV1(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
hosts, upgraders := getNetHosts(t, ctx, 3)
|
||||
addTransportV1(t, ctx, hosts[0], upgraders[0])
|
||||
addTransport(t, ctx, hosts[2], upgraders[2])
|
||||
|
||||
rch := make(chan []byte, 1)
|
||||
hosts[0].SetStreamHandler("test", func(s network.Stream) {
|
||||
defer s.Close()
|
||||
defer close(rch)
|
||||
|
||||
buf := make([]byte, 1024)
|
||||
nread := 0
|
||||
for nread < len(buf) {
|
||||
n, err := s.Read(buf[nread:])
|
||||
nread += n
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
rch <- buf[:nread]
|
||||
})
|
||||
|
||||
_, err := v1.NewRelay(ctx, hosts[1], upgraders[1], v1.OptHop)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
connect(t, hosts[0], hosts[1])
|
||||
connect(t, hosts[1], hosts[2])
|
||||
|
||||
raddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit/p2p/%s", hosts[1].ID(), hosts[0].ID()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
conns := hosts[2].Network().ConnsToPeer(hosts[0].ID())
|
||||
if len(conns) != 1 {
|
||||
t.Fatalf("expected 1 connection, but got %d", len(conns))
|
||||
}
|
||||
if conns[0].Stat().Transient {
|
||||
t.Fatal("expected non transient connection")
|
||||
}
|
||||
|
||||
s, err := hosts[2].NewStream(ctx, hosts[0].ID(), "test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msg := []byte("relay works!")
|
||||
nwritten, err := s.Write(msg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if nwritten != len(msg) {
|
||||
t.Fatalf("expected to write %d bytes, but wrote %d instead", len(msg), nwritten)
|
||||
}
|
||||
s.CloseWrite()
|
||||
|
||||
got := <-rch
|
||||
if !bytes.Equal(msg, got) {
|
||||
t.Fatalf("Wrong echo; expected %s but got %s", string(msg), string(got))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRelayCompatV1DialV2(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
hosts, upgraders := getNetHosts(t, ctx, 3)
|
||||
addTransport(t, ctx, hosts[0], upgraders[0])
|
||||
addTransportV1(t, ctx, hosts[2], upgraders[2])
|
||||
|
||||
rch := make(chan []byte, 1)
|
||||
hosts[0].SetStreamHandler("test", func(s network.Stream) {
|
||||
defer s.Close()
|
||||
defer close(rch)
|
||||
|
||||
buf := make([]byte, 1024)
|
||||
nread := 0
|
||||
for nread < len(buf) {
|
||||
n, err := s.Read(buf[nread:])
|
||||
nread += n
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
rch <- buf[:nread]
|
||||
})
|
||||
|
||||
_, err := v1.NewRelay(ctx, hosts[1], upgraders[1], v1.OptHop)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
connect(t, hosts[0], hosts[1])
|
||||
connect(t, hosts[1], hosts[2])
|
||||
|
||||
raddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit/p2p/%s", hosts[1].ID(), hosts[0].ID()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
conns := hosts[2].Network().ConnsToPeer(hosts[0].ID())
|
||||
if len(conns) != 1 {
|
||||
t.Fatalf("expected 1 connection, but got %d", len(conns))
|
||||
}
|
||||
if conns[0].Stat().Transient {
|
||||
t.Fatal("expected non transient connection")
|
||||
}
|
||||
|
||||
s, err := hosts[2].NewStream(ctx, hosts[0].ID(), "test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msg := []byte("relay works!")
|
||||
nwritten, err := s.Write(msg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if nwritten != len(msg) {
|
||||
t.Fatalf("expected to write %d bytes, but wrote %d instead", len(msg), nwritten)
|
||||
}
|
||||
s.CloseWrite()
|
||||
|
||||
got := <-rch
|
||||
if !bytes.Equal(msg, got) {
|
||||
t.Fatalf("Wrong echo; expected %s but got %s", string(msg), string(got))
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,361 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/mux"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
bhost "github.com/libp2p/go-libp2p-blankhost"
|
||||
metrics "github.com/libp2p/go-libp2p-core/metrics"
|
||||
pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem"
|
||||
swarm "github.com/libp2p/go-libp2p-swarm"
|
||||
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
|
||||
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
|
||||
tcp "github.com/libp2p/go-tcp-transport"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// TODO temporary for debugging purposes; to be removed for merge.
|
||||
logging.SetLogLevel("relay", "DEBUG")
|
||||
logging.SetLogLevel("p2p-circuit", "DEBUG")
|
||||
}
|
||||
|
||||
func getNetHosts(t *testing.T, ctx context.Context, n int) (hosts []host.Host, upgraders []*tptu.Upgrader) {
|
||||
for i := 0; i < n; i++ {
|
||||
privk, pubk, err := crypto.GenerateKeyPair(crypto.Ed25519, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
p, err := peer.IDFromPublicKey(pubk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ps := pstoremem.NewPeerstore()
|
||||
err = ps.AddPrivKey(p, privk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
bwr := metrics.NewBandwidthCounter()
|
||||
netw := swarm.NewSwarm(ctx, p, ps, bwr)
|
||||
|
||||
upgrader := swarmt.GenUpgrader(netw)
|
||||
upgraders = append(upgraders, upgrader)
|
||||
|
||||
err = netw.AddTransport(tcp.NewTCPTransport(upgrader))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = netw.Listen(ma.StringCast("/ip4/127.0.0.1/tcp/0"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
h := bhost.NewBlankHost(netw)
|
||||
|
||||
hosts = append(hosts, h)
|
||||
}
|
||||
|
||||
return hosts, upgraders
|
||||
}
|
||||
|
||||
func connect(t *testing.T, a, b host.Host) {
|
||||
pi := peer.AddrInfo{ID: a.ID(), Addrs: a.Addrs()}
|
||||
err := b.Connect(context.Background(), pi)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func addTransport(t *testing.T, ctx context.Context, h host.Host, upgrader *tptu.Upgrader) {
|
||||
err := client.AddTransport(ctx, h, upgrader)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBasicRelay(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
hosts, upgraders := getNetHosts(t, ctx, 3)
|
||||
addTransport(t, ctx, hosts[0], upgraders[0])
|
||||
addTransport(t, ctx, hosts[2], upgraders[2])
|
||||
|
||||
rch := make(chan []byte, 1)
|
||||
hosts[0].SetStreamHandler("test", func(s network.Stream) {
|
||||
defer s.Close()
|
||||
defer close(rch)
|
||||
|
||||
buf := make([]byte, 1024)
|
||||
nread := 0
|
||||
for nread < len(buf) {
|
||||
n, err := s.Read(buf[nread:])
|
||||
nread += n
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
rch <- buf[:nread]
|
||||
})
|
||||
|
||||
r, err := relay.New(hosts[1])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
connect(t, hosts[0], hosts[1])
|
||||
connect(t, hosts[1], hosts[2])
|
||||
|
||||
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
|
||||
rsvp, err := client.Reserve(ctx, hosts[0], rinfo)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if rsvp.Voucher == nil {
|
||||
t.Fatal("no reservation voucher")
|
||||
}
|
||||
|
||||
raddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit/p2p/%s", hosts[1].ID(), hosts[0].ID()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
conns := hosts[2].Network().ConnsToPeer(hosts[0].ID())
|
||||
if len(conns) != 1 {
|
||||
t.Fatalf("expected 1 connection, but got %d", len(conns))
|
||||
}
|
||||
if !conns[0].Stat().Transient {
|
||||
t.Fatal("expected transient connection")
|
||||
}
|
||||
|
||||
s, err := hosts[2].NewStream(network.WithUseTransient(ctx, "test"), hosts[0].ID(), "test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msg := []byte("relay works!")
|
||||
nwritten, err := s.Write(msg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if nwritten != len(msg) {
|
||||
t.Fatalf("expected to write %d bytes, but wrote %d instead", len(msg), nwritten)
|
||||
}
|
||||
s.CloseWrite()
|
||||
|
||||
got := <-rch
|
||||
if !bytes.Equal(msg, got) {
|
||||
t.Fatalf("Wrong echo; expected %s but got %s", string(msg), string(got))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRelayLimitTime(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
hosts, upgraders := getNetHosts(t, ctx, 3)
|
||||
addTransport(t, ctx, hosts[0], upgraders[0])
|
||||
addTransport(t, ctx, hosts[2], upgraders[2])
|
||||
|
||||
rch := make(chan error, 1)
|
||||
hosts[0].SetStreamHandler("test", func(s network.Stream) {
|
||||
defer s.Close()
|
||||
defer close(rch)
|
||||
|
||||
buf := make([]byte, 1024)
|
||||
_, err := s.Read(buf)
|
||||
rch <- err
|
||||
})
|
||||
|
||||
rc := relay.DefaultResources()
|
||||
rc.Limit.Duration = time.Second
|
||||
|
||||
r, err := relay.New(hosts[1], relay.WithResources(rc))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
connect(t, hosts[0], hosts[1])
|
||||
connect(t, hosts[1], hosts[2])
|
||||
|
||||
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
|
||||
_, err = client.Reserve(ctx, hosts[0], rinfo)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
raddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit/p2p/%s", hosts[1].ID(), hosts[0].ID()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
conns := hosts[2].Network().ConnsToPeer(hosts[0].ID())
|
||||
if len(conns) != 1 {
|
||||
t.Fatalf("expected 1 connection, but got %d", len(conns))
|
||||
}
|
||||
if !conns[0].Stat().Transient {
|
||||
t.Fatal("expected transient connection")
|
||||
}
|
||||
|
||||
s, err := hosts[2].NewStream(network.WithUseTransient(ctx, "test"), hosts[0].ID(), "test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
n, err := s.Write([]byte("should be closed"))
|
||||
if n > 0 {
|
||||
t.Fatalf("expected to write 0 bytes, wrote %d", n)
|
||||
}
|
||||
if err != mux.ErrReset {
|
||||
t.Fatalf("expected reset, but got %s", err)
|
||||
}
|
||||
|
||||
err = <-rch
|
||||
if err != mux.ErrReset {
|
||||
t.Fatalf("expected reset, but got %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRelayLimitData(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
hosts, upgraders := getNetHosts(t, ctx, 3)
|
||||
addTransport(t, ctx, hosts[0], upgraders[0])
|
||||
addTransport(t, ctx, hosts[2], upgraders[2])
|
||||
|
||||
rch := make(chan int, 1)
|
||||
hosts[0].SetStreamHandler("test", func(s network.Stream) {
|
||||
defer s.Close()
|
||||
defer close(rch)
|
||||
|
||||
buf := make([]byte, 1024)
|
||||
for i := 0; i < 3; i++ {
|
||||
n, err := s.Read(buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rch <- n
|
||||
}
|
||||
|
||||
n, err := s.Read(buf)
|
||||
if err != mux.ErrReset {
|
||||
t.Fatalf("expected reset but got %s", err)
|
||||
}
|
||||
rch <- n
|
||||
})
|
||||
|
||||
rc := relay.DefaultResources()
|
||||
rc.Limit.Duration = time.Second
|
||||
rc.Limit.Data = 4096
|
||||
|
||||
r, err := relay.New(hosts[1], relay.WithResources(rc))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
connect(t, hosts[0], hosts[1])
|
||||
connect(t, hosts[1], hosts[2])
|
||||
|
||||
rinfo := hosts[1].Peerstore().PeerInfo(hosts[1].ID())
|
||||
_, err = client.Reserve(ctx, hosts[0], rinfo)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
raddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit/p2p/%s", hosts[1].ID(), hosts[0].ID()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
conns := hosts[2].Network().ConnsToPeer(hosts[0].ID())
|
||||
if len(conns) != 1 {
|
||||
t.Fatalf("expected 1 connection, but got %d", len(conns))
|
||||
}
|
||||
if !conns[0].Stat().Transient {
|
||||
t.Fatal("expected transient connection")
|
||||
}
|
||||
|
||||
s, err := hosts[2].NewStream(network.WithUseTransient(ctx, "test"), hosts[0].ID(), "test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
buf := make([]byte, 1024)
|
||||
for i := 0; i < 3; i++ {
|
||||
_, err = rand.Read(buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
n, err := s.Write(buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if n != len(buf) {
|
||||
t.Fatalf("expected to write %d bytes but wrote %d", len(buf), n)
|
||||
}
|
||||
|
||||
n = <-rch
|
||||
if n != len(buf) {
|
||||
t.Fatalf("expected to read %d bytes but read %d", len(buf), n)
|
||||
}
|
||||
}
|
||||
|
||||
buf = make([]byte, 4096)
|
||||
_, err = rand.Read(buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s.Write(buf)
|
||||
|
||||
n := <-rch
|
||||
if n != 0 {
|
||||
t.Fatalf("expected to read 0 bytes but read %d", n)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
package test
|
||||
@@ -0,0 +1,67 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
pool "github.com/libp2p/go-buffer-pool"
|
||||
"github.com/libp2p/go-msgio/protoio"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/multiformats/go-varint"
|
||||
)
|
||||
|
||||
type DelimitedReader struct {
|
||||
r io.Reader
|
||||
buf []byte
|
||||
}
|
||||
|
||||
// The gogo protobuf NewDelimitedReader is buffered, which may eat up stream data.
|
||||
// So we need to implement a compatible delimited reader that reads unbuffered.
|
||||
// There is a slowdown from unbuffered reading: when reading the message
|
||||
// it can take multiple single byte Reads to read the length and another Read
|
||||
// to read the message payload.
|
||||
// However, this is not critical performance degradation as
|
||||
// - the reader is utilized to read one (dialer, stop) or two messages (hop) during
|
||||
// the handshake, so it's a drop in the water for the connection lifetime.
|
||||
// - messages are small (max 4k) and the length fits in a couple of bytes,
|
||||
// so overall we have at most three reads per message.
|
||||
func NewDelimitedReader(r io.Reader, maxSize int) *DelimitedReader {
|
||||
return &DelimitedReader{r: r, buf: pool.Get(maxSize)}
|
||||
}
|
||||
|
||||
func (d *DelimitedReader) Close() {
|
||||
if d.buf != nil {
|
||||
pool.Put(d.buf)
|
||||
d.buf = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DelimitedReader) ReadByte() (byte, error) {
|
||||
buf := d.buf[:1]
|
||||
_, err := d.r.Read(buf)
|
||||
return buf[0], err
|
||||
}
|
||||
|
||||
func (d *DelimitedReader) ReadMsg(msg proto.Message) error {
|
||||
mlen, err := varint.ReadUvarint(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if uint64(len(d.buf)) < mlen {
|
||||
return errors.New("message too large")
|
||||
}
|
||||
|
||||
buf := d.buf[:mlen]
|
||||
_, err = io.ReadFull(d.r, buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return proto.Unmarshal(buf, msg)
|
||||
}
|
||||
|
||||
func NewDelimitedWriter(w io.Writer) protoio.WriteCloser {
|
||||
return protoio.NewDelimitedWriter(w)
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
func IsRelayAddr(a ma.Multiaddr) bool {
|
||||
_, err := a.ValueForProtocol(ma.P_CIRCUIT)
|
||||
return err == nil
|
||||
}
|
||||
@@ -0,0 +1,97 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
pbv1 "github.com/libp2p/go-libp2p-circuit/pb"
|
||||
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
func PeerToPeerInfoV1(p *pbv1.CircuitRelay_Peer) (peer.AddrInfo, error) {
|
||||
if p == nil {
|
||||
return peer.AddrInfo{}, errors.New("nil peer")
|
||||
}
|
||||
|
||||
id, err := peer.IDFromBytes(p.Id)
|
||||
if err != nil {
|
||||
return peer.AddrInfo{}, err
|
||||
}
|
||||
|
||||
var addrs []ma.Multiaddr
|
||||
if len(p.Addrs) > 0 {
|
||||
addrs = make([]ma.Multiaddr, 0, len(p.Addrs))
|
||||
}
|
||||
|
||||
for _, addrBytes := range p.Addrs {
|
||||
a, err := ma.NewMultiaddrBytes(addrBytes)
|
||||
if err == nil {
|
||||
addrs = append(addrs, a)
|
||||
}
|
||||
}
|
||||
|
||||
return peer.AddrInfo{ID: id, Addrs: addrs}, nil
|
||||
}
|
||||
|
||||
func PeerInfoToPeerV1(pi peer.AddrInfo) *pbv1.CircuitRelay_Peer {
|
||||
var addrs [][]byte
|
||||
if len(pi.Addrs) > 0 {
|
||||
addrs = make([][]byte, 0, len(pi.Addrs))
|
||||
}
|
||||
|
||||
for _, addr := range pi.Addrs {
|
||||
addrs = append(addrs, addr.Bytes())
|
||||
}
|
||||
|
||||
p := new(pbv1.CircuitRelay_Peer)
|
||||
p.Id = []byte(pi.ID)
|
||||
p.Addrs = addrs
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func PeerToPeerInfoV2(p *pbv2.Peer) (peer.AddrInfo, error) {
|
||||
if p == nil {
|
||||
return peer.AddrInfo{}, errors.New("nil peer")
|
||||
}
|
||||
|
||||
id, err := peer.IDFromBytes(p.Id)
|
||||
if err != nil {
|
||||
return peer.AddrInfo{}, err
|
||||
}
|
||||
|
||||
var addrs []ma.Multiaddr
|
||||
if len(p.Addrs) > 0 {
|
||||
addrs = make([]ma.Multiaddr, 0, len(p.Addrs))
|
||||
}
|
||||
|
||||
for _, addrBytes := range p.Addrs {
|
||||
a, err := ma.NewMultiaddrBytes(addrBytes)
|
||||
if err == nil {
|
||||
addrs = append(addrs, a)
|
||||
}
|
||||
}
|
||||
|
||||
return peer.AddrInfo{ID: id, Addrs: addrs}, nil
|
||||
}
|
||||
|
||||
func PeerInfoToPeerV2(pi peer.AddrInfo) *pbv2.Peer {
|
||||
var addrs [][]byte
|
||||
|
||||
if len(pi.Addrs) > 0 {
|
||||
addrs = make([][]byte, 0, len(pi.Addrs))
|
||||
}
|
||||
|
||||
for _, addr := range pi.Addrs {
|
||||
addrs = append(addrs, addr.Bytes())
|
||||
}
|
||||
|
||||
p := new(pbv2.Peer)
|
||||
p.Id = []byte(pi.ID)
|
||||
p.Addrs = addrs
|
||||
|
||||
return p
|
||||
}
|
||||
Reference in New Issue
Block a user