Files
2026-04-17 21:15:10 +02:00

1228 lines
30 KiB
Go

package pingtunnel
import (
"github.com/esrrhs/gohome/common"
"github.com/esrrhs/gohome/loggo"
"github.com/esrrhs/gohome/network"
"golang.org/x/net/icmp"
"google.golang.org/protobuf/proto"
"io"
"math"
"math/rand"
"net"
"strconv"
"sync"
"sync/atomic"
"time"
)
const (
SEND_PROTO int = 8
RECV_PROTO int = 0
)
func NewClient(addr string, server string, target string, timeout int, key int, icmpAddr string,
tcpmode int, tcpmode_buffersize int, tcpmode_maxwin int, tcpmode_resend_timems int, tcpmode_compress int,
tcpmode_stat int, open_sock5 int, maxconn int, sock5_filter *func(addr string) bool, cryptoConfig *CryptoConfig,
sock5_user string, sock5_pass string) (*Client, error) {
var ipaddr *net.UDPAddr
var tcpaddr *net.TCPAddr
var err error
if tcpmode > 0 {
tcpaddr, err = net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
}
} else {
ipaddr, err = net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
}
ipaddrServer, err := net.ResolveIPAddr("ip", server)
if err != nil {
return nil, err
}
rand.Seed(time.Now().UnixNano())
now := time.Now()
c := &Client{
exit: false,
rtt: 0,
id: rand.Intn(math.MaxInt16),
ipaddr: ipaddr,
tcpaddr: tcpaddr,
addr: addr,
ipaddrServer: ipaddrServer,
addrServer: server,
targetAddr: target,
icmpAddr: icmpAddr,
timeout: timeout,
key: key,
tcpmode: tcpmode,
tcpmode_buffersize: tcpmode_buffersize,
tcpmode_maxwin: tcpmode_maxwin,
tcpmode_resend_timems: tcpmode_resend_timems,
tcpmode_compress: tcpmode_compress,
tcpmode_stat: tcpmode_stat,
open_sock5: open_sock5,
maxconn: maxconn,
pongTime: now,
sock5_filter: sock5_filter,
sock5_user: sock5_user,
sock5_pass: sock5_pass,
cryptoConfig: cryptoConfig,
nextResolveAt: now,
resolveRetryBackoff: 2 * time.Second,
}
c.lastActivityUnixNano.Store(now.UnixNano())
return c, nil
}
type Client struct {
exit bool
rtt time.Duration
workResultLock sync.WaitGroup
maxconn int
id int
sequence int
timeout int
sproto int
rproto int
key int
tcpmode int
tcpmode_buffersize int
tcpmode_maxwin int
tcpmode_resend_timems int
tcpmode_compress int
tcpmode_stat int
open_sock5 int
sock5_filter *func(addr string) bool
sock5_user string
sock5_pass string
cryptoConfig *CryptoConfig
ipaddr *net.UDPAddr
tcpaddr *net.TCPAddr
addr string
ipaddrServer *net.IPAddr
addrServer string
targetAddr string
icmpAddr string
conn *icmp.PacketConn
listenConn *net.UDPConn
tcplistenConn *net.TCPListener
localAddrToConnMap sync.Map
localIdToConnMap sync.Map
sendPacket uint64
recvPacket uint64
sendPacketSize uint64
recvPacketSize uint64
localAddrToConnMapSize int
localIdToConnMapSize int
recvcontrol chan int
pongTime time.Time
lastActivityUnixNano atomic.Int64
nextResolveAt time.Time
resolveRetryBackoff time.Duration
}
type ClientConn struct {
exit bool
ipaddr *net.UDPAddr
tcpaddr *net.TCPAddr
id string
addrKey string
tcpmode int
activeRecvTime time.Time
activeSendTime time.Time
close bool
udpRelayConn *net.UDPConn
udpTargetAddr string
activity chan struct{}
fm *network.FrameMgr
}
func (p *Client) Addr() string {
return p.addr
}
func (p *Client) IPAddr() *net.UDPAddr {
return p.ipaddr
}
func (p *Client) TargetAddr() string {
return p.targetAddr
}
func (p *Client) ICMPAddr() string {
return p.icmpAddr
}
func (p *Client) ServerIPAddr() *net.IPAddr {
return p.ipaddrServer
}
func (p *Client) ServerAddr() string {
return p.addrServer
}
func (p *Client) RTT() time.Duration {
return p.rtt
}
func (p *Client) RecvPacketSize() uint64 {
return p.recvPacketSize
}
func (p *Client) SendPacketSize() uint64 {
return p.sendPacketSize
}
func (p *Client) RecvPacket() uint64 {
return p.recvPacket
}
func (p *Client) SendPacket() uint64 {
return p.sendPacket
}
func (p *Client) LocalIdToConnMapSize() int {
return p.localIdToConnMapSize
}
func (p *Client) LocalAddrToConnMapSize() int {
return p.localAddrToConnMapSize
}
func (p *Client) touchActivity() {
p.lastActivityUnixNano.Store(time.Now().UnixNano())
}
func (p *Client) lastActivity() time.Time {
ns := p.lastActivityUnixNano.Load()
if ns <= 0 {
return time.Now()
}
return time.Unix(0, ns)
}
func (p *Client) activeConnCount() int {
count := 0
p.localIdToConnMap.Range(func(key, value interface{}) bool {
count++
return true
})
return count
}
func (p *Client) nextPingInterval(now time.Time) time.Duration {
const (
hotActivityWindow = 5 * time.Second
warmActivityWindow = 30 * time.Second
)
if p.activeConnCount() > 0 || now.Sub(p.lastActivity()) <= hotActivityWindow {
return time.Second
}
if now.Sub(p.lastActivity()) <= warmActivityWindow {
return 3 * time.Second
}
return 10 * time.Second
}
func (p *Client) maybeRefreshServerAddr(now time.Time) {
if now.Before(p.nextResolveAt) {
return
}
ipaddrServer, err := net.ResolveIPAddr("ip", p.addrServer)
if err != nil {
if p.resolveRetryBackoff < 2*time.Second {
p.resolveRetryBackoff = 2 * time.Second
} else {
p.resolveRetryBackoff *= 2
if p.resolveRetryBackoff > time.Minute {
p.resolveRetryBackoff = time.Minute
}
}
p.nextResolveAt = now.Add(p.resolveRetryBackoff)
return
}
if p.ipaddrServer == nil || p.ipaddrServer.String() != ipaddrServer.String() {
loggo.Info("server ip refreshed %v -> %v", p.ipaddrServer, ipaddrServer)
p.ipaddrServer = ipaddrServer
}
p.resolveRetryBackoff = 2 * time.Second
if now.Sub(p.pongTime) > 3*time.Second {
p.nextResolveAt = now.Add(5 * time.Second)
return
}
p.nextResolveAt = now.Add(30 * time.Second)
}
func (p *Client) Run() error {
conn, err := listenICMP(p.icmpAddr)
if err != nil {
loggo.Error("Error listening for ICMP packets: %s", err.Error())
return err
}
p.conn = conn
if p.tcpmode > 0 {
tcplistenConn, err := net.ListenTCP("tcp", p.tcpaddr)
if err != nil {
loggo.Error("Error listening for tcp packets: %s", err.Error())
return err
}
p.tcplistenConn = tcplistenConn
} else {
listener, err := net.ListenUDP("udp", p.ipaddr)
if err != nil {
loggo.Error("Error listening for udp packets: %s", err.Error())
return err
}
p.listenConn = listener
}
if p.tcpmode > 0 {
go p.AcceptTcp()
} else {
go p.Accept()
}
recv := make(chan *Packet, 10000)
p.recvcontrol = make(chan int, 1)
go recvICMP(&p.workResultLock, &p.exit, *p.conn, recv, p.cryptoConfig)
go func() {
defer common.CrashLog()
p.workResultLock.Add(1)
defer p.workResultLock.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
nextPingAt := time.Now()
for !p.exit {
p.checkTimeoutConn()
p.showNet()
now := time.Now()
if !now.Before(nextPingAt) {
p.ping()
nextPingAt = now.Add(p.nextPingInterval(now))
}
p.maybeRefreshServerAddr(now)
<-ticker.C
}
}()
go func() {
defer common.CrashLog()
p.workResultLock.Add(1)
defer p.workResultLock.Done()
for !p.exit {
select {
case <-p.recvcontrol:
return
case r := <-recv:
p.processPacket(r)
}
}
}()
return nil
}
func (p *Client) Stop() {
p.exit = true
p.recvcontrol <- 1
p.workResultLock.Wait()
p.conn.Close()
if p.tcplistenConn != nil {
p.tcplistenConn.Close()
}
if p.listenConn != nil {
p.listenConn.Close()
}
}
func (p *Client) AcceptTcp() error {
defer common.CrashLog()
p.workResultLock.Add(1)
defer p.workResultLock.Done()
loggo.Info("client waiting local accept tcp")
for !p.exit {
p.tcplistenConn.SetDeadline(time.Now().Add(time.Millisecond * 1000))
conn, err := p.tcplistenConn.AcceptTCP()
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Info("Error accept tcp %s", err)
continue
}
}
if conn != nil {
if p.open_sock5 > 0 {
go p.AcceptSock5Conn(conn)
} else {
go p.AcceptTcpConn(conn, p.targetAddr)
}
}
}
return nil
}
func (p *Client) AcceptTcpConn(conn *net.TCPConn, targetAddr string) {
defer common.CrashLog()
p.workResultLock.Add(1)
defer p.workResultLock.Done()
tcpsrcaddr := conn.RemoteAddr().(*net.TCPAddr)
if p.maxconn > 0 && p.localIdToConnMapSize >= p.maxconn {
loggo.Info("too many connections %d, client accept new local tcp fail %s", p.localIdToConnMapSize, tcpsrcaddr.String())
return
}
uuid := common.UniqueId()
fm := network.NewFrameMgr(FRAME_MAX_SIZE, FRAME_MAX_ID, p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems, p.tcpmode_compress, p.tcpmode_stat)
now := time.Now()
clientConn := &ClientConn{exit: false, tcpaddr: tcpsrcaddr, id: uuid, tcpmode: p.tcpmode, activeRecvTime: now, activeSendTime: now, close: false,
activity: make(chan struct{}, 1),
fm: fm}
p.addClientConn(uuid, tcpsrcaddr.String(), clientConn)
loggo.Info("client accept new local tcp %s %s", uuid, tcpsrcaddr.String())
p.touchActivity()
loggo.Info("start connect remote tcp %s %s", uuid, tcpsrcaddr.String())
clientConn.fm.Connect()
startConnectTime := common.GetNowUpdateInSecond()
connectWait := newAdaptiveLoopWait(2*time.Millisecond, 80*time.Millisecond)
for !p.exit && !clientConn.exit {
if clientConn.fm.IsConnected() {
break
}
clientConn.fm.Update()
sendlist := clientConn.fm.GetSendList()
hadWork := sendlist.Len() > 0
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*network.Frame)
mb, _ := clientConn.fm.MarshalFrame(f)
p.sequence++
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb,
SEND_PROTO, RECV_PROTO, p.key,
p.tcpmode, p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems, p.tcpmode_compress, p.tcpmode_stat,
p.timeout, p.cryptoConfig)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
now := common.GetNowUpdateInSecond()
diffclose := now.Sub(startConnectTime)
if diffclose > time.Second*5 {
loggo.Info("can not connect remote tcp %s %s", uuid, tcpsrcaddr.String())
p.close(clientConn)
return
}
if hadWork {
connectWait.hit()
continue
}
wait := connectWait.miss()
select {
case <-clientConn.activity:
connectWait.hit()
case <-time.After(wait):
}
}
if !clientConn.exit {
loggo.Info("connected remote tcp %s %s", uuid, tcpsrcaddr.String())
}
bytes := make([]byte, 10240)
tcpActiveRecvUnix := atomic.Int64{}
tcpActiveRecvUnix.Store(common.GetNowUpdateInSecond().UnixNano())
tcpActiveSendTime := common.GetNowUpdateInSecond()
readErr := make(chan error, 1)
stopRead := make(chan struct{})
go func() {
defer common.CrashLog()
readWait := newAdaptiveLoopWait(2*time.Millisecond, 80*time.Millisecond)
for !p.exit && !clientConn.exit {
left := common.MinOfInt(clientConn.fm.GetSendBufferLeft(), len(bytes))
if left <= 0 {
wait := readWait.miss()
select {
case <-stopRead:
return
case <-clientConn.activity:
readWait.hit()
continue
case <-time.After(wait):
continue
}
}
readWait.hit()
conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
n, err := conn.Read(bytes[0:left])
if err != nil {
nerr, ok := err.(net.Error)
if ok && nerr.Timeout() {
continue
}
select {
case readErr <- err:
default:
}
return
}
if n <= 0 {
continue
}
clientConn.fm.WriteSendBuffer(bytes[:n])
tcpActiveRecvUnix.Store(common.GetNowUpdateInSecond().UnixNano())
p.touchActivity()
notifyActivity(clientConn.activity)
}
}()
loopWait := newAdaptiveLoopWait(2*time.Millisecond, 250*time.Millisecond)
mainLoop:
for !p.exit && !clientConn.exit {
now := common.GetNowUpdateInSecond()
hadWork := false
clientConn.fm.Update()
sendlist := clientConn.fm.GetSendList()
if sendlist.Len() > 0 {
hadWork = true
clientConn.activeSendTime = now
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*network.Frame)
mb, err := clientConn.fm.MarshalFrame(f)
if err != nil {
loggo.Error("Error tcp Marshal %s %s %s", uuid, tcpsrcaddr.String(), err)
continue
}
p.sequence++
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb,
SEND_PROTO, RECV_PROTO, p.key,
p.tcpmode, 0, 0, 0, 0, 0,
0, p.cryptoConfig)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
p.touchActivity()
}
if clientConn.fm.GetRecvBufferSize() > 0 {
hadWork = true
rr := clientConn.fm.GetRecvReadLineBuffer()
conn.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
n, err := conn.Write(rr)
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Info("Error write tcp %s %s %s", uuid, tcpsrcaddr.String(), err)
clientConn.fm.Close()
break mainLoop
}
}
if n > 0 {
clientConn.fm.SkipRecvBuffer(n)
tcpActiveSendTime = now
p.touchActivity()
}
}
select {
case err := <-readErr:
if err != nil {
loggo.Info("Error read tcp %s %s %s", uuid, tcpsrcaddr.String(), err)
clientConn.fm.Close()
break mainLoop
}
default:
}
diffrecv := now.Sub(clientConn.activeRecvTime)
diffsend := now.Sub(clientConn.activeSendTime)
tcpdiffrecv := now.Sub(time.Unix(0, tcpActiveRecvUnix.Load()))
tcpdiffsend := now.Sub(tcpActiveSendTime)
if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) ||
(tcpdiffrecv > time.Second*(time.Duration(p.timeout)) && tcpdiffsend > time.Second*(time.Duration(p.timeout))) {
loggo.Info("close inactive conn %s %s", clientConn.id, clientConn.tcpaddr.String())
clientConn.fm.Close()
break
}
if clientConn.fm.IsRemoteClosed() {
loggo.Info("closed by remote conn %s %s", clientConn.id, clientConn.tcpaddr.String())
clientConn.fm.Close()
break
}
if !hadWork {
wait := loopWait.miss()
select {
case <-clientConn.activity:
loopWait.hit()
case err := <-readErr:
if err != nil {
loggo.Info("Error read tcp %s %s %s", uuid, tcpsrcaddr.String(), err)
clientConn.fm.Close()
break mainLoop
}
case <-time.After(wait):
}
} else {
loopWait.hit()
}
}
close(stopRead)
clientConn.fm.Close()
startCloseTime := common.GetNowUpdateInSecond()
for !p.exit && !clientConn.exit {
now := common.GetNowUpdateInSecond()
clientConn.fm.Update()
sendlist := clientConn.fm.GetSendList()
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*network.Frame)
mb, _ := clientConn.fm.MarshalFrame(f)
p.sequence++
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb,
SEND_PROTO, RECV_PROTO, p.key,
p.tcpmode, 0, 0, 0, 0, 0,
0, p.cryptoConfig)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
nodatarecv := true
if clientConn.fm.GetRecvBufferSize() > 0 {
rr := clientConn.fm.GetRecvReadLineBuffer()
conn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100))
n, _ := conn.Write(rr)
if n > 0 {
clientConn.fm.SkipRecvBuffer(n)
nodatarecv = false
}
}
diffclose := now.Sub(startCloseTime)
if diffclose > time.Second*60 {
loggo.Info("close conn had timeout %s %s", clientConn.id, clientConn.tcpaddr.String())
break
}
remoteclosed := clientConn.fm.IsRemoteClosed()
if remoteclosed && nodatarecv {
loggo.Info("remote conn had closed %s %s", clientConn.id, clientConn.tcpaddr.String())
break
}
time.Sleep(time.Millisecond * 100)
}
loggo.Info("close tcp conn %s %s", clientConn.id, clientConn.tcpaddr.String())
conn.Close()
p.close(clientConn)
}
func (p *Client) Accept() error {
defer common.CrashLog()
p.workResultLock.Add(1)
defer p.workResultLock.Done()
loggo.Info("client waiting local accept udp")
bytes := make([]byte, 10240)
for !p.exit {
p.listenConn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
n, srcaddr, err := p.listenConn.ReadFromUDP(bytes)
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Info("Error read udp %s", err)
continue
}
}
if n <= 0 {
continue
}
now := common.GetNowUpdateInSecond()
clientConn := p.getClientConnByAddr(srcaddr.String())
if clientConn == nil {
if p.maxconn > 0 && p.localIdToConnMapSize >= p.maxconn {
loggo.Info("too many connections %d, client accept new local udp fail %s", p.localIdToConnMapSize, srcaddr.String())
continue
}
uuid := common.UniqueId()
clientConn = &ClientConn{exit: false, ipaddr: srcaddr, id: uuid, tcpmode: 0, activeRecvTime: now, activeSendTime: now, close: false}
p.addClientConn(uuid, srcaddr.String(), clientConn)
loggo.Info("client accept new local udp %s %s", uuid, srcaddr.String())
}
clientConn.activeSendTime = now
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(MyMsg_DATA), bytes[:n],
SEND_PROTO, RECV_PROTO, p.key,
clientConn.tcpmode, 0, 0, 0, 0, 0,
p.timeout, p.cryptoConfig)
p.sequence++
p.sendPacket++
p.sendPacketSize += (uint64)(n)
p.touchActivity()
}
return nil
}
func (p *Client) processPacket(packet *Packet) {
if packet.my.Rproto >= 0 {
return
}
if packet.my.Key != (int32)(p.key) {
return
}
if !icmpDatagram && packet.echoId != p.id {
return
}
if packet.my.Type == (int32)(MyMsg_PING) {
t := time.Time{}
t.UnmarshalBinary(packet.my.Data)
now := time.Now()
d := now.Sub(t)
loggo.Info("pong from %s %s", packet.src.String(), d.String())
p.rtt = d
p.pongTime = now
return
}
if packet.my.Type == (int32)(MyMsg_KICK) {
clientConn := p.getClientConnById(packet.my.Id)
if clientConn != nil {
p.close(clientConn)
loggo.Info("remote kick local %s", packet.my.Id)
}
return
}
loggo.Debug("processPacket %s %s %d", packet.my.Id, packet.src.String(), len(packet.my.Data))
clientConn := p.getClientConnById(packet.my.Id)
if clientConn == nil {
loggo.Debug("processPacket no conn %s ", packet.my.Id)
p.remoteError(packet.my.Id)
return
}
now := common.GetNowUpdateInSecond()
clientConn.activeRecvTime = now
if clientConn.tcpmode > 0 {
f := &network.Frame{}
err := proto.Unmarshal(packet.my.Data, f)
if err != nil {
loggo.Error("Unmarshal tcp Error %s", err)
return
}
clientConn.fm.OnRecvFrame(f)
notifyActivity(clientConn.activity)
} else {
if packet.my.Data == nil {
return
}
addr := clientConn.ipaddr
var err error
if clientConn.udpRelayConn != nil {
udpTargetAddr := clientConn.udpTargetAddr
if packet.my.Target != "" {
udpTargetAddr = packet.my.Target
}
udpPacket, packetErr := buildSocks5UDPDatagram(udpTargetAddr, packet.my.Data)
if packetErr != nil {
loggo.Info("build socks5 udp datagram error %s", packetErr)
clientConn.close = true
return
}
_, err = clientConn.udpRelayConn.WriteToUDP(udpPacket, addr)
} else {
_, err = p.listenConn.WriteToUDP(packet.my.Data, addr)
}
if err != nil {
loggo.Info("WriteToUDP Error read udp %s", err)
clientConn.close = true
return
}
}
p.recvPacket++
p.recvPacketSize += (uint64)(len(packet.my.Data))
if packet.my.Type == (int32)(MyMsg_DATA) && len(packet.my.Data) > 0 {
p.touchActivity()
}
}
func (p *Client) close(clientConn *ClientConn) {
if clientConn == nil || clientConn.exit {
return
}
clientConn.exit = true
if clientConn.id != "" {
p.localIdToConnMap.Delete(clientConn.id)
}
if clientConn.addrKey != "" {
p.localAddrToConnMap.Delete(clientConn.addrKey)
}
}
func (p *Client) checkTimeoutConn() {
tmp := make(map[string]*ClientConn)
p.localIdToConnMap.Range(func(key, value interface{}) bool {
id := key.(string)
clientConn := value.(*ClientConn)
tmp[id] = clientConn
return true
})
now := common.GetNowUpdateInSecond()
for _, conn := range tmp {
if conn.tcpmode > 0 {
continue
}
diffrecv := now.Sub(conn.activeRecvTime)
diffsend := now.Sub(conn.activeSendTime)
if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) {
conn.close = true
}
}
for id, conn := range tmp {
if conn.tcpmode > 0 {
continue
}
if conn.close {
addr := conn.addrKey
if conn.ipaddr != nil {
addr = conn.ipaddr.String()
}
loggo.Info("close inactive conn %s %s", id, addr)
p.close(conn)
}
}
}
func (p *Client) ping() {
now := time.Now()
b, _ := now.MarshalBinary()
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, "", "", (uint32)(MyMsg_PING), b,
SEND_PROTO, RECV_PROTO, p.key,
0, 0, 0, 0, 0, 0,
0, p.cryptoConfig)
loggo.Info("ping %s %s %d %d %d %d", p.addrServer, now.String(), p.sproto, p.rproto, p.id, p.sequence)
p.sequence++
if now.Sub(p.pongTime) > time.Second*3 {
p.rtt = 0
}
}
func (p *Client) showNet() {
p.localAddrToConnMapSize = 0
p.localIdToConnMap.Range(func(key, value interface{}) bool {
p.localAddrToConnMapSize++
return true
})
p.localIdToConnMapSize = 0
p.localIdToConnMap.Range(func(key, value interface{}) bool {
p.localIdToConnMapSize++
return true
})
loggo.Info("send %dPacket/s %dKB/s recv %dPacket/s %dKB/s %d/%dConnections",
p.sendPacket, p.sendPacketSize/1024, p.recvPacket, p.recvPacketSize/1024, p.localAddrToConnMapSize, p.localIdToConnMapSize)
p.sendPacket = 0
p.recvPacket = 0
p.sendPacketSize = 0
p.recvPacketSize = 0
}
func (p *Client) AcceptSock5Conn(conn *net.TCPConn) {
defer common.CrashLog()
p.workResultLock.Add(1)
defer p.workResultLock.Done()
var err error = nil
if err = network.Sock5HandshakeBy(conn, p.sock5_user, p.sock5_pass); err != nil {
loggo.Error("socks handshake: %s", err)
conn.Close()
return
}
req, err := readSocks5Request(conn)
if err != nil {
loggo.Error("error getting request: %s", err)
writeSocks5Reply(conn, socks5ReplyGeneralFailure, "0.0.0.0:0")
conn.Close()
return
}
switch req.Command {
case socks5CmdConnect:
// Sending connection established message immediately to client.
// This some round trip time for creating socks connection with the client.
// But if connection failed, the client will get connection reset error.
err = writeSocks5Reply(conn, socks5ReplySucceeded, "0.0.0.0:0")
if err != nil {
loggo.Error("send connection confirmation: %s", err)
conn.Close()
return
}
loggo.Info("accept new sock5 tcp conn: %s", req.Address)
if p.sock5_filter == nil {
p.AcceptTcpConn(conn, req.Address)
} else {
if (*p.sock5_filter)(req.Address) {
p.AcceptTcpConn(conn, req.Address)
return
}
p.AcceptDirectTcpConn(conn, req.Address)
}
case socks5CmdUDPAssociate:
p.AcceptSock5UDPConn(conn, req.Address)
default:
loggo.Info("unsupported sock5 command: %d", req.Command)
writeSocks5Reply(conn, socks5ReplyCommandNotSupported, "0.0.0.0:0")
conn.Close()
}
}
func (p *Client) AcceptSock5UDPConn(conn *net.TCPConn, associateAddr string) {
relayBind := &net.UDPAddr{}
if p.tcpaddr != nil {
relayBind.IP = p.tcpaddr.IP
relayBind.Zone = p.tcpaddr.Zone
}
relayConn, err := net.ListenUDP("udp", relayBind)
if err != nil {
loggo.Error("create sock5 udp relay failed: %s", err)
writeSocks5Reply(conn, socks5ReplyGeneralFailure, "0.0.0.0:0")
conn.Close()
return
}
relayAddr := copyUDPAddr(relayConn.LocalAddr().(*net.UDPAddr))
if relayAddr.IP == nil || relayAddr.IP.IsUnspecified() {
if localAddr, ok := conn.LocalAddr().(*net.TCPAddr); ok && localAddr.IP != nil && !localAddr.IP.IsUnspecified() {
relayAddr.IP = append(net.IP(nil), localAddr.IP...)
relayAddr.Zone = localAddr.Zone
}
}
if relayAddr.IP == nil {
relayAddr.IP = net.IPv4zero
}
if err := writeSocks5Reply(conn, socks5ReplySucceeded, relayAddr.String()); err != nil {
loggo.Error("send udp associate confirmation: %s", err)
relayConn.Close()
conn.Close()
return
}
loggo.Info("accept new sock5 udp associate: %s relay %s", associateAddr, relayAddr.String())
expectedIP, expectedPort := parseSock5UDPAssociateHint(associateAddr)
udpExit := make(chan struct{})
go func() {
defer close(udpExit)
p.recvSock5UDP(relayConn, expectedIP, expectedPort)
}()
ctrlBuf := make([]byte, 1)
for !p.exit {
conn.SetReadDeadline(time.Now().Add(time.Millisecond * 200))
_, err := conn.Read(ctrlBuf)
if err != nil {
nerr, ok := err.(net.Error)
if ok && nerr.Timeout() {
continue
}
break
}
}
conn.Close()
relayConn.Close()
<-udpExit
p.closeSock5UDPFlows(relayConn)
loggo.Info("close sock5 udp associate relay %s", relayAddr.String())
}
func (p *Client) recvSock5UDP(relayConn *net.UDPConn, expectedIP net.IP, expectedPort int) {
defer common.CrashLog()
p.workResultLock.Add(1)
defer p.workResultLock.Done()
bytes := make([]byte, 65535)
var sourceAddr *net.UDPAddr
for !p.exit {
relayConn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
n, srcaddr, err := relayConn.ReadFromUDP(bytes)
if err != nil {
nerr, ok := err.(net.Error)
if ok && nerr.Timeout() {
continue
}
if !p.exit {
loggo.Info("Error read sock5 udp %s", err)
}
return
}
if n <= 0 {
continue
}
if !p.allowSock5UDPSource(srcaddr, expectedIP, expectedPort, &sourceAddr) {
loggo.Debug("drop unexpected sock5 udp source %s", srcaddr.String())
continue
}
targetAddr, payload, err := parseSocks5UDPDatagram(bytes[:n])
if err != nil {
loggo.Debug("parse sock5 udp datagram failed: %s", err)
continue
}
now := common.GetNowUpdateInSecond()
connKey := p.sock5UDPConnKey(relayConn, srcaddr, targetAddr)
clientConn := p.getClientConnByAddr(connKey)
if clientConn == nil {
if p.maxconn > 0 && p.localIdToConnMapSize >= p.maxconn {
loggo.Info("too many connections %d, client accept new sock5 udp fail %s", p.localIdToConnMapSize, srcaddr.String())
continue
}
uuid := common.UniqueId()
clientConn = &ClientConn{
exit: false,
ipaddr: copyUDPAddr(srcaddr),
id: uuid,
tcpmode: 0,
activeRecvTime: now,
activeSendTime: now,
close: false,
udpRelayConn: relayConn,
udpTargetAddr: targetAddr,
}
p.addClientConn(uuid, connKey, clientConn)
loggo.Info("client accept new sock5 udp %s %s -> %s", uuid, srcaddr.String(), targetAddr)
}
clientConn.activeSendTime = now
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, targetAddr, clientConn.id, (uint32)(MyMsg_DATA), payload,
SEND_PROTO, RECV_PROTO, p.key,
0, 0, 0, 0, 0, 0,
p.timeout, p.cryptoConfig)
p.sequence++
p.sendPacket++
p.sendPacketSize += (uint64)(len(payload))
p.touchActivity()
}
}
func (p *Client) allowSock5UDPSource(srcaddr *net.UDPAddr, expectedIP net.IP, expectedPort int, sourceAddr **net.UDPAddr) bool {
if expectedIP != nil && !expectedIP.Equal(srcaddr.IP) {
return false
}
if expectedPort > 0 && srcaddr.Port != expectedPort {
return false
}
if *sourceAddr == nil {
*sourceAddr = copyUDPAddr(srcaddr)
return true
}
return (*sourceAddr).IP.Equal(srcaddr.IP) && (*sourceAddr).Port == srcaddr.Port
}
func (p *Client) closeSock5UDPFlows(relayConn *net.UDPConn) {
var tmp []*ClientConn
p.localIdToConnMap.Range(func(key, value interface{}) bool {
clientConn := value.(*ClientConn)
if clientConn.tcpmode == 0 && clientConn.udpRelayConn == relayConn {
tmp = append(tmp, clientConn)
}
return true
})
for _, clientConn := range tmp {
p.close(clientConn)
}
}
func (p *Client) sock5UDPConnKey(relayConn *net.UDPConn, srcaddr *net.UDPAddr, targetAddr string) string {
return relayConn.LocalAddr().String() + "|" + srcaddr.String() + "|" + targetAddr
}
func copyUDPAddr(addr *net.UDPAddr) *net.UDPAddr {
if addr == nil {
return nil
}
ret := &net.UDPAddr{
IP: append(net.IP(nil), addr.IP...),
Port: addr.Port,
Zone: addr.Zone,
}
return ret
}
func parseSock5UDPAssociateHint(addr string) (net.IP, int) {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, 0
}
var ip net.IP
if parsedIP := net.ParseIP(host); parsedIP != nil && !parsedIP.IsUnspecified() {
ip = parsedIP
}
port, err := strconv.Atoi(portStr)
if err != nil || port < 0 || port > 65535 {
port = 0
}
return ip, port
}
func (p *Client) addClientConn(uuid string, addr string, clientConn *ClientConn) {
clientConn.addrKey = addr
p.localAddrToConnMap.Store(addr, clientConn)
p.localIdToConnMap.Store(uuid, clientConn)
}
func (p *Client) getClientConnByAddr(addr string) *ClientConn {
ret, ok := p.localAddrToConnMap.Load(addr)
if !ok {
return nil
}
return ret.(*ClientConn)
}
func (p *Client) getClientConnById(uuid string) *ClientConn {
ret, ok := p.localIdToConnMap.Load(uuid)
if !ok {
return nil
}
return ret.(*ClientConn)
}
func (p *Client) remoteError(uuid string) {
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, "", uuid, (uint32)(MyMsg_KICK), []byte{},
SEND_PROTO, RECV_PROTO, p.key,
0, 0, 0, 0, 0, 0,
0, p.cryptoConfig)
}
func (p *Client) AcceptDirectTcpConn(conn *net.TCPConn, targetAddr string) {
defer common.CrashLog()
p.workResultLock.Add(1)
defer p.workResultLock.Done()
tcpsrcaddr := conn.RemoteAddr().(*net.TCPAddr)
loggo.Info("client accept new direct local tcp %s %s", tcpsrcaddr.String(), targetAddr)
tcpaddrTarget, err := net.ResolveTCPAddr("tcp", targetAddr)
if err != nil {
loggo.Info("direct local tcp ResolveTCPAddr fail: %s %s", targetAddr, err.Error())
return
}
targetconn, err := net.DialTCP("tcp", nil, tcpaddrTarget)
if err != nil {
loggo.Info("direct local tcp DialTCP fail: %s %s", targetAddr, err.Error())
return
}
go p.transfer(conn, targetconn, conn.RemoteAddr().String(), targetconn.RemoteAddr().String())
go p.transfer(targetconn, conn, targetconn.RemoteAddr().String(), conn.RemoteAddr().String())
loggo.Info("client accept new direct local tcp ok %s %s", tcpsrcaddr.String(), targetAddr)
}
func (p *Client) transfer(destination io.WriteCloser, source io.ReadCloser, dst string, src string) {
defer common.CrashLog()
defer destination.Close()
defer source.Close()
loggo.Info("client begin transfer from %s -> %s", src, dst)
io.Copy(destination, source)
loggo.Info("client end transfer from %s -> %s", src, dst)
}