diff --git a/.gitignore b/.gitignore index 51e15fd..88ddd84 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,6 @@ src/github.com src/bitbucket.org src/launchpad.net src/gopkg.in -src/ngrok/client/assets/ -src/ngrok/server/assets/ -.idea/ \ No newline at end of file +.idea/ +*.test +src/punching/log/ \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..9ca2d7f --- /dev/null +++ b/Makefile @@ -0,0 +1,64 @@ +.PHONY : default server client proxy all_windows all_darwin windows arm darwin deps fmt clean all +export GOPATH:=$(shell pwd) + +PREFIX='' +default: all + +GOOS= +GOARCH= +GOARM= + +fmt: + go fmt punching/... + +deps: + go get -d -v punching/... + +server: deps + go install punching/main/server + +client: deps + go install punching/main/client + +proxy: deps + go install punching/main/proxy + + +server_linux: + GOOS=linux GOARCH=amd64 go install punching/main/server +client_linux: + GOOS=linux GOARCH=amd64 go install punching/main/client +proxy_linux: + GOOS=linux GOARCH=amd64 go install punching/main/proxy + +server_windows: + GOOS=windows GOARCH=amd64 go install punching/main/server +client_windows: + GOOS=windows GOARCH=amd64 go install punching/main/client +proxy_windows: + GOOS=windows GOARCH=amd64 go install punching/main/proxy + +server_darwin: + GOOS=darwin GOARCH=amd64 go install punching/main/server +client_darwin: + GOOS=darwin GOARCH=amd64 go install punching/main/client +proxy_darwin: + GOOS=darwin GOARCH=amd64 go install punching/main/proxy + +server_arm: + GOOS=linux GOARCH=arm GOARM=5 go install punching/main/server +client_arm: + GOOS=linux GOARCH=arm GOARM=5 go install punching/main/client +proxy_arm: + GOOS=linux GOARCH=arm GOARM=5 go install punching/main/proxy + + +all_darwin: fmt client_darwin server_darwin proxy_darwin +all_linux: fmt client_linux server_linux proxy_linux +all_windows: fmt client_windows server_windows proxy_windows +all_arm: fmt client_arm server_arm proxy_arm +all_platform: all_darwin all_linux all_windows all_arm +all: fmt client server proxy + +clean: + go clean -i -r punching/... diff --git a/README.md b/README.md new file mode 100644 index 0000000..7d8d438 --- /dev/null +++ b/README.md @@ -0,0 +1,64 @@ +### 使用场景 + + 有时,我们需互相访问公司和家里的电脑,如果家里或公司的网络提供商提供的是公网IP,我们仅需要在路由器里设置端口映射,就可以在别外访问到我们开启的服务。但由于Ipv4地址历史原因,可分配的公网IP越来越少,分配到私有IP的情况很多。这时候就需要使用NAT穿透技术,使用在NAT后的两端可互相访问。 + +### 项目介绍 + + + + + 如上图所示,在公网上部署一台跨网服务器,上面运行解析端(proxy), P2P客户端和服务端启动时向跨网解析端提交TCP连接请求,以便公网解析端根据请求,记录各自自连接时NAT地址,并告知对方的NAT地址。P2P客户端和服务端尝试同时连接,进行NAT穿透。在穿透成功后,P2P终端可以脱离跨网解析端独立进行TCP数据通讯,无需第三方数据转发。 + +### 如何使用 + + 1. 迁出源码 git clone https://github.com/chenboxing/punching.git + 2. 进入项目目录 src/punching/ + 3. 编译源码 + make all # 编译所在平台的所有端 + make_windows # 编译windows平台的所有端 + make_linux # 编译linux平台的所有端 + make_darwin # 编译mac os 平台的所有端 + make_arm # 编译arm嵌入式平台的所有端,arm版本基于5 + + 编译后二进制文件放在 punching/bin/目录下 + + 或你也可以访问下面链接直接下载已经编译好的文件: + + + 4. 跨网解析端和P2P端配置和使用 + + 4.1 跨网解析端部署(如没有公网服务器,此步可跳过): + + 把proxy(代理转发端)和配置文件proxy.conf 部署到公网计算机上,配置proxy.conf配置节[proxy],设置侦听端口,默认7777 + 运行解析端 + ./proxy + + 4.2 配置P2P服务端 + + + 4.3 配置P2P客户端 + + [ThirdProxy] + address = nat.move8.cn + email = xxxx@xxxxx.com + password = xxxxxxx + + 先在Nat网络一端,你需要开放访问的服务的计算机上部署server端,配置config.conf配置节[server],在listen项里设置你要开放的应用服务,如 192.168.1.45:80, proxy添写你的代理转发端公网地址和端口,比如 xxx.f3322.net:7777, 如果你需要使用本站提供的代理转发,此项请为空,参考上面填写节[ThirdProxy]信息。 + + 配置好,启动Server端: + nat_server.exe + + 配置在Client端 + 在Nat网络的另一端,部署client端,先配置config.conf,在节[client],需要先设置好要侦听的端口信息,比如listen = :8585, proxy添写你的代理转发端公网地址和端口,比如 xxx.f3322.net:7777, 如果你需要使用本站提供的代理转发,此项请为空,参考上面填写节[ThirdProxy]信息。 + + 配置好,启动Server端: + nat_client.exe + +### 当前局限 + 1. 只能提供一对一的P2P通讯(Client端,Server端),无法多人访问开启的P2P服务端服务.如果你想实现多人访问开启的服务,你可以使用tunnel(隧道)工具,但前提是需要架设一台线上服务器,部署tunnel服务端,在要开放服务的计算机上部署tunnel客户端。 + 2. 现在,项目只实现了TCP P2P穿透方案,如果后台服务需要UDP协议通讯 ,无法工作,比如vnc服务就无法访问,需要实现UDP和TCP穿透才可以工作。windows xp sp2下的平台也不支持P2P连接的TCP同时连接特性,所以无法工作。 + 3. 因为条件限制,本项目测试场景是基于非对称性NAT, 对于对称性NAT穿透可能会失败。 + +### 依赖的第三方包 + github.com/cihub/seelog: 日志记录增加包 + github.com/BurntSushi/toml: toml配置文件处理包 \ No newline at end of file diff --git a/src/punching/Makefile b/src/punching/Makefile deleted file mode 100644 index a1c0639..0000000 --- a/src/punching/Makefile +++ /dev/null @@ -1,53 +0,0 @@ -.PHONY: default server client deps fmt clean all release-all assets client-assets server-assets contributors -export GOPATH:=$(shell pwd) - -BUILDTAGS=debug -default: all - -deps: assets - go get -tags '$(BUILDTAGS)' -d -v ngrok/... - -server: deps - go install -tags '$(BUILDTAGS)' ngrok/main/ngrokd - -fmt: - go fmt ngrok/... - -client: deps - go install -tags '$(BUILDTAGS)' ngrok/main/ngrok - -assets: client-assets server-assets - -bin/go-bindata: - GOOS="" GOARCH="" go get github.com/jteeuwen/go-bindata/go-bindata - -client-assets: bin/go-bindata - bin/go-bindata -nomemcopy -pkg=assets -tags=$(BUILDTAGS) \ - -debug=$(if $(findstring debug,$(BUILDTAGS)),true,false) \ - -o=src/ngrok/client/assets/assets_$(BUILDTAGS).go \ - assets/client/... - -server-assets: bin/go-bindata - bin/go-bindata -nomemcopy -pkg=assets -tags=$(BUILDTAGS) \ - -debug=$(if $(findstring debug,$(BUILDTAGS)),true,false) \ - -o=src/ngrok/server/assets/assets_$(BUILDTAGS).go \ - assets/server/... - -release-client: BUILDTAGS=release -release-client: client - -release-server: BUILDTAGS=release -release-server: server - -release-all: fmt release-client release-server - -all: fmt client server - -clean: - go clean -i -r ngrok/... - rm -rf src/ngrok/client/assets/ src/ngrok/server/assets/ - -contributors: - echo "Contributors to ngrok, both large and small:\n" > CONTRIBUTORS - git log --raw | grep "^Author: " | sort | uniq | cut -d ' ' -f2- | sed 's/^/- /' | cut -d '<' -f1 >> CONTRIBUTORS - diff --git a/src/punching/README.md b/src/punching/README.md deleted file mode 100644 index fed37dc..0000000 --- a/src/punching/README.md +++ /dev/null @@ -1,41 +0,0 @@ -### 使用场景 - - - - -### 项目介绍 - - 本项目完全是点对点的访问,无须经由服务器转发,代理转发端只是为了获取当自的Nat网络IP和端口,为了Nat穿透,在获取各自对方的IP和端口后,就不需要服务端的干预。 - -### 如何使用 - 1. 迁出源码git clone https://github.com/chenboxing/punching.git - 2. 进入项目目录 src/punching/ - 3. 编译源码 - make (all|server|client|proxy) (windows|darwin|linux|arm) - make all windows #编译windows平台下所有 - make server|client linux #编译Windows平台Server端和Client端二进制文件 - 二进制文件编译在 punching/bin/里 - - 你也可以直接下载已经编译好的文件: - - - 4. 把proxy(代理转发端) 部署到公网计算机上,配置config.conf配置节[proxy],设置侦听端口,默认7777,如果没有公网计算机,可以使用本站设置好的域名 nat.move8.cn, 在config.conf配置节[ThridProxy]设置好相关信息,比如 - [ThirdProxy] - address = nat.move8.cn - email = xxxx@xxxxx.com - password = xxxxxxx - - 先在Nat网络一端,你需要开放访问的服务的计算机上部署server端,配置config.conf配置节[server],在listen项里设置你要开放的应用服务,如 192.168.1.45:80, proxy添写你的代理转发端公网地址和端口,比如 xxx.f3322.net:7777, 如果你需要使用本站提供的代理转发,此项请为空,参考上面填写节[ThirdProxy]信息。 - - 配置好,启动Server端: - nat_server.exe - - 配置在Client端 - 在Nat网络的另一端,部署client端,先配置config.conf,在节[client],需要先设置好要侦听的端口信息,比如listen = :8585, proxy添写你的代理转发端公网地址和端口,比如 xxx.f3322.net:7777, 如果你需要使用本站提供的代理转发,此项请为空,参考上面填写节[ThirdProxy]信息。 - - 配置好,启动Server端: - nat_client.exe - -### 局限 - 只能P2P通讯(Client端,Server端),无法多人访问开启的服务 - \ No newline at end of file diff --git a/src/client.conf b/src/punching/client.conf similarity index 63% rename from src/client.conf rename to src/punching/client.conf index bedefb8..1c70ec7 100644 --- a/src/client.conf +++ b/src/punching/client.conf @@ -1,7 +1,7 @@ [client] -proxy = ":" +proxy = "127.0.0.1:7777" listen = ":8585" -key = "" +key = "ABCD" [ThirdProxy] address = "proxy.move8.cn:7777" \ No newline at end of file diff --git a/src/punching/client/config.go b/src/punching/client/config.go index 69c51c4..d1f0748 100644 --- a/src/punching/client/config.go +++ b/src/punching/client/config.go @@ -2,6 +2,7 @@ package client import ( "fmt" + "os" "punching/util" ) @@ -15,47 +16,48 @@ key = "" address = "proxy.move8.cn:7777" */ -type ServerConfig struct { - Proxy string `toml:"proxy"` // Proxy 服务的地址 - Dial string `toml:"dial"` // 服务端提供的服务地址 - Key string `toml:"key"` // 客户端和服务端的匹配码 +type ClientConfig struct { + Proxy string `toml:"proxy"` // Proxy 服务的地址 + Listen string `toml:"listen"` // 服务端提供的服务地址 + Key string `toml:"key"` // 客户端和服务端的匹配码 } -type ThridProxyConfig struct { +type ThirdProxyConfig struct { Address string `toml:"address"` // Proxy 服务的地址 } -var Config *ServerConfig -var ThirdConfig *ThridProxyConfig +var Config ClientConfig +var ThirdConfig ThirdProxyConfig func InitConfig() (err error) { - if Config == nil { + // 加载配置信息 + fileName := "client.conf" - // 加载配置信息 - fileName := "server.conf" - sectionName1 := "server" - if err01 := util.DecodeSection(fileName, sectionName1, Config); err01 != nil { - err = fmt.Errorf("Load config file failed, error:%s", err01.Error()) - return - } + if os.Getenv("CLIENT_CONF") != "" { + fileName = os.Getenv("CLIENT_CONF") + } - sectionName2 := "ThridProxy" - if err02 := util.DecodeSection(fileName, sectionName2, ThirdConfig); err != nil { - err = fmt.Errorf("Load config file failed, error:%s", err02.Error()) - return - } + sectionName1 := "client" + if err01 := util.DecodeSection(fileName, sectionName1, &Config); err01 != nil { + err = fmt.Errorf("Load config file failed, error:%s", err01.Error()) + return + } - if Config.Key == "" { - err = fmt.Errorf("匹配码不能为空,请在client.conf配置匹配码") - return - } + sectionName2 := "ThirdProxy" + if err02 := util.DecodeSection(fileName, sectionName2, &ThirdConfig); err != nil { + err = fmt.Errorf("Load config file failed, error:%s", err02.Error()) + return + } - if Config.Proxy == "" && ThirdConfig.Address == "" { - err = fmt.Errorf("Proxy服务地址和第三方Proxy服务地址不能同时为空") - return - } + if Config.Key == "" { + err = fmt.Errorf("匹配码不能为空,请在client.conf配置匹配码") + return + } + if Config.Proxy == "" && ThirdConfig.Address == "" { + err = fmt.Errorf("Proxy服务地址和第三方Proxy服务地址不能同时为空") + return } return nil diff --git a/src/punching/client/frontend.go b/src/punching/client/frontend.go index 35c270c..15bee3a 100644 --- a/src/punching/client/frontend.go +++ b/src/punching/client/frontend.go @@ -1,13 +1,12 @@ package client import ( - "fmt" - "log" "net" . "punching/constant" + "punching/logger" + "punching/util" "sync" "time" - "punching/util" ) var ListenAcceptMap map[string]net.Conn @@ -15,37 +14,38 @@ var ExitChanMap map[string]chan bool var RWLock *sync.RWMutex - func handleClientConn(source net.Conn) { // 4 bits unique session id var sessionID string - for{ + // Unique session id + for { RWLock.Lock() sessionID = util.GenerateRandomPairKey() - if _, ok := ListenAcceptMap[sessionID]; !ok{ + _, ok := ListenAcceptMap[sessionID] + RWLock.Unlock() + if !ok { break } - RWLock.Unlock() } - log.Println("Enter handleClientConn:", sessionID) + logger.Infof("Enter handleClientConn:%s", sessionID) RWLock.Lock() ListenAcceptMap[sessionID] = source ExitChanMap[sessionID] = make(chan bool) RWLock.Unlock() - log.Println("建立Map", sessionID) + logger.Infof("建立Map,%s", sessionID) defer func() { e1 := source.Close() if e1 != nil { - log.Println("关闭Sourcer失败") + logger.Error("关闭Sourcer失败") } RWLock.Lock() delete(ListenAcceptMap, sessionID) delete(ExitChanMap, sessionID) - log.Println("删除map", sessionID) + logger.Infof("删除map:%s", sessionID) RWLock.Unlock() }() @@ -60,21 +60,22 @@ func handleClientConn(source net.Conn) { len01, err := source.Read(buf) if len01 <= 0 || err != nil { - log.Println("读取Source源连接出错,原因为:", err.Error()) + logger.Errorf("读取Source源连接出错,原因为:%s", err) //发送控制 - packQuit := util.PackageNat(PAIR_CONTROL_QUIT, [4]byte(sessionID),[]byte("") ) + packQuit := util.PackageNat(PAIR_CONTROL_QUIT, sessionID, []byte("")) Wch <- packQuit return } - controlID := PAIR_CONTROL_NORMAL + controlID := PAIR_CONTROL_NORMAL if flag == 0 { // 第一次 controlID = PAIR_CONTROL_FIRST flag = 1 } - pack := util.PackageNat(controlID, [4]byte(sessionID), buf[0:len01]) + + pack := util.PackageNat(controlID, sessionID, buf[0:len01]) Wch <- pack } @@ -83,7 +84,7 @@ func handleClientConn(source net.Conn) { select { case <-ExitChanMap[sessionID]: - log.Println("需要退出Accept") + logger.Warn("需要退出Accept") return } } @@ -95,14 +96,14 @@ func ClientListenHandle() { ExitChanMap = make(map[string]chan bool) RWLock = new(sync.RWMutex) - addrOn := Config.Dial + addrOn := Config.Listen l, err := net.Listen("tcp", addrOn) if err != nil { - fmt.Println("listen ", addrOn, " error:", err) + logger.Errorf("listen %s encountered errors %s", addrOn, err) return } - fmt.Println("server running at port", addrOn) + logger.Infof("server running at port %s", addrOn) // 全局读取来自nat源的包 go handleReadConn() @@ -111,7 +112,7 @@ func ClientListenHandle() { for { c, err := l.Accept() if err != nil { - fmt.Println("accept error:", err) + logger.Errorf("accept error: %s", err) break } go handleClientConn(c) @@ -125,31 +126,30 @@ func handleReadConn() { select { case pact := <-Rch: - log.Println(time.Now().UnixNano(), "handleReadConn准备处理") // 获取src controlID := pact.ControlID sessionID := string(pact.SessionID) data := pact.Data - log.Println("读取Nat包:handleReadConn", sessionID, "长度为", len(data)) - //退出 if controlID == PAIR_CONTROL_QUIT { if c, ok := ExitChanMap[sessionID]; ok { - log.Println("发送退出信号") + logger.Info("发送退出信号") c <- true } else { - log.Println("在ExitChanMap里找不到Key为:", sessionID) + logger.Info("在ExitChanMap里找不到Key为:", sessionID) } } else { if src, ok := ListenAcceptMap[sessionID]; ok { len2, err2 := src.Write(data) if err2 != nil || len2 <= 0 { - log.Println("源写入出错", err2.Error()) + logger.Infof("源写入出错:%s", err2) + } else { + logger.Info(time.Now().UnixNano(), "源写入:", len2) } - log.Println(time.Now().UnixNano(), "源写入:", len2) + } else { - log.Println("在Map里找不到Key为:", sessionID) + logger.Info("在Map里找不到Key为:", sessionID) } } diff --git a/src/punching/client/main.go b/src/punching/client/main.go index 5967a4d..473ea9d 100644 --- a/src/punching/client/main.go +++ b/src/punching/client/main.go @@ -1,11 +1,10 @@ package client import ( - "fmt" - "log" + "os" + "punching/logger" "punching/util" "time" - "os" ) var ( @@ -18,35 +17,57 @@ func Main() { // 加载配置信息 if err := InitConfig(); err != nil { - fmt.Println("加载配置信息出错,原因为:%s", err) + logger.Errorf("加载配置信息出错,原因为:%s", err) return } - proxyAddr := Config.Dial + proxyAddr := Config.Proxy if proxyAddr == "" { proxyAddr = ThirdConfig.Address } pairName := Config.Key - localAddr, destAddr, pairName, err := util.ClientDialProxy(proxyAddr, pairName) - if err != nil { - log.Println(err) - return - } + logger.Infof("准备连接代理解析端:%s", proxyAddr) + + tryCount := 0 + var connPeer util.NetConn + var errPeer error + + for { + + localAddr, destAddr, err := util.ClientDialProxy(proxyAddr, pairName) + + if err != nil { + logger.Errorf("连接解析端出错,%s", err) + return + } + + logger.Infof("已获取NAT地址:本地地址:%s,远程地址:%s ", localAddr, destAddr) + + tryCount += 1 + + if tryCount == 11 { + logger.Errorf("已经尝试了10次,连接还是失败,退出,请重新运行客户端") + return + } + //连接P2P服务端 + connPeer, errPeer = util.DialPeer(localAddr, destAddr) + if errPeer != nil { //无法连接上 + logger.Errorf("连接P2P服务端,出现错误,%s,第%d次", errPeer, tryCount) + time.Sleep(3 * time.Second) + continue + } else { + break + } - //连接P2P服务端 - connPeer, errPeer := util.DialPeer(localAddr, destAddr) - if errPeer != nil { //无法连接上 - log.Println(errPeer) - return } Dch = make(chan bool) Rch = make(chan util.PairPackage) Wch = make(chan []byte) - go RHandler(connPeer) //Nat端写通道 - go WHandler(connPeer) //Nat端读通道 + go RHandler(connPeer) //Nat端写通道 + go WHandler(connPeer) //Nat端读通道 // 侦听端口,开启服务,将端口输入转发到P2P端 ClientListenHandle() @@ -54,6 +75,7 @@ func Main() { // 如果P2P端通讯出错,退出 select { case <-Dch: + logger.Errorf("接收到退出信息") os.Exit(1) } } @@ -67,11 +89,11 @@ func RHandler(conn util.NetConn) { for { j, err := conn.Read(buff) if err != nil { - log.Println("读取连接数据出错,原因为:", err.Error()) + logger.Errorf("读取连接数据出错,原因为:%s", err) Dch <- true break } - log.Println("准备解包数据:", j) + logger.Info("准备解包数据:", j) // 解包 tmpBuffer = util.UnpackageNat(append(tmpBuffer, buff[:j]...), Rch) } @@ -83,10 +105,10 @@ func WHandler(conn util.NetConn) { case msg := <-Wch: l, err := conn.Write(msg) if err != nil { - log.Println("写到Nat目录连接出错:", err.Error()) + logger.Errorf("写到Nat目录连接出错:%s", err) Dch <- true } else { - log.Println(time.Now().UnixNano(), "已写入到Nat:", l) + logger.Info(time.Now().UnixNano(), "已写入到Nat:", l) } // } diff --git a/src/punching/config.conf b/src/punching/config.conf deleted file mode 100644 index daa6b35..0000000 --- a/src/punching/config.conf +++ /dev/null @@ -1,17 +0,0 @@ -[proxy] -listen = ":7777" - -[server] -proxy = ":7777" -dial = "192.168.1.168:443" -key = "amychen" - -[client] -proxy = ":7777" -listen = ":8585" -key = "amychen" - -[ThirdProxy] -address = "proxy.move8.cn:7777" -email = 368123477@qq.com -password = dingding diff --git a/src/punching/constant/pair.go b/src/punching/constant/pair.go index d35786e..72af9b6 100644 --- a/src/punching/constant/pair.go +++ b/src/punching/constant/pair.go @@ -3,27 +3,24 @@ package constant // Constant for the client and server const ( - PAIR_CONTROL_FIRST byte = 11 // 控制码 C->S第一个包 - PAIR_CONTROL_QUIT byte = 10 // 控制码 退出 - PAIR_CONTROL_NORMAL byte = 0 // 控制码 + PAIR_CONTROL_FIRST byte = 11 // 控制码 C->S第一个包 + PAIR_CONTROL_QUIT byte = 10 // 控制码 退出 + PAIR_CONTROL_NORMAL byte = 0 // 控制码 - PAIR_PACKAGE_HEAD_LENGTH = 6 // C<->S 自定义包头长度 - PAIR_PACKAGE_CONTROL_LENGTH = 1 // 包控制码长度 + PAIR_PACKAGE_HEAD_LENGTH = 6 // C<->S 自定义包头长度 + PAIR_PACKAGE_CONTROL_LENGTH = 1 // 包控制码长度 PAIR_PACKAGE_SESSIONID_LENGTH = 4 // 包会话ID长度 - PAIR_PACKAGE_DATA_LENGTH = 4 // 包数据长度 - PAIR_PACKAGE_PREFIX_LENGTH = 15 // head + control +sessionid + data length - - + PAIR_PACKAGE_DATA_LENGTH = 4 // 包数据长度 + PAIR_PACKAGE_PREFIX_LENGTH = 15 // head[6] + control[1] + sessionid[4] + data length[4] ) const ( - ROLE_SERVER byte = 1 // 点对点服务端 - ROLE_CLIENT byte = 2 // 点对点客户端 + ROLE_SERVER byte = 1 // 点对点服务端 + ROLE_CLIENT byte = 2 // 点对点客户端 ) var ( - PAIR_PACKAGE_HEAD = [6]byte{'C','B','X','N','A','T'} // C<->S 自定义包头 + PAIR_PACKAGE_HEAD string = "CBXNAT" // C<->S 自定义包头 ) - diff --git a/src/punching/constant/proxy.go b/src/punching/constant/proxy.go index 254db21..e43a080 100644 --- a/src/punching/constant/proxy.go +++ b/src/punching/constant/proxy.go @@ -1,16 +1,15 @@ package constant const ( - PROXY_PACKAGE_HEAD byte = 'H' // C<->S 自定义包头 - PROXY_CONTROL_FIRST byte = 11 // 控制ID 第一个数据包 - PROXY_CONTROL_NORMAL byte = 0 // 控制码 正常发送 - PROXY_CONTROL_ACK byte = 12 // 控制码 确认 - PROXY_CONTROL_QUIT byte = 10 // 控制码 退出 - PROXY_CONTROL_HEARTBIT byte = 13 // 控制码 心跳包 - PROXY_CONTROL_HEARTBITACK byte = 14 // 心跳包确认 - PROXY_CONTROL_ERROR_NO_SERVER byte = 201 // 服务端还没有注册 + PROXY_PACKAGE_HEAD byte = 'H' // C<->S 自定义包头 + PROXY_CONTROL_FIRST byte = 11 // 控制ID 第一个数据包 + PROXY_CONTROL_NORMAL byte = 0 // 控制码 正常发送 + PROXY_CONTROL_ACK byte = 12 // 控制码 确认 + PROXY_CONTROL_QUIT byte = 10 // 控制码 退出 + PROXY_CONTROL_HEARTBIT byte = 13 // 控制码 心跳包 + PROXY_CONTROL_HEARTBITACK byte = 14 // 心跳包确认 + PROXY_CONTROL_ERROR_NO_SERVER byte = 201 // 服务端还没有注册 PROXY_CONTROL_ERROR_CLIENT_EXIST byte = 202 // 客户端已经存在 PROXY_CONTROL_ERROR_SERVER_EXIST byte = 203 // 服务端已经存在 - -) \ No newline at end of file +) diff --git a/src/punching/logger/logger.go b/src/punching/logger/logger.go index eebfb0a..4269014 100644 --- a/src/punching/logger/logger.go +++ b/src/punching/logger/logger.go @@ -1,20 +1,26 @@ -package util +package logger import ( "fmt" "github.com/cihub/seelog" + "os" + "strings" ) // init 初始化包 func init() { // 解析服务配置文件 + logFileName := "./log/punching.log" + if os.Getenv("Log_FILE") != "" { + logFileName = os.Getenv("Log_FILE") + } xml := ` - + @@ -24,12 +30,13 @@ func init() { ` + xml = strings.Replace(xml, "#LOG_FILE_NAME", logFileName, 1) // 解析日志配置(从默认配置) - logger, err := seelog.LoggerFromConfigAsBytes([]byte(xml)) + logg, err := seelog.LoggerFromConfigAsBytes([]byte(xml)) if err != nil { panic(fmt.Errorf("log configuration parse error: %s", err.Error())) } - seelog.ReplaceLogger(logger) + seelog.ReplaceLogger(logg) } diff --git a/src/punching/main/client.go b/src/punching/main/client/client.go similarity index 76% rename from src/punching/main/client.go rename to src/punching/main/client/client.go index 6ea21bd..319d760 100644 --- a/src/punching/main/client.go +++ b/src/punching/main/client/client.go @@ -1,5 +1,7 @@ -package main +package main + import "punching/client" + func main() { client.Main() -} \ No newline at end of file +} diff --git a/src/punching/main/proxy.go b/src/punching/main/proxy/proxy.go similarity index 62% rename from src/punching/main/proxy.go rename to src/punching/main/proxy/proxy.go index d0a775d..4956cd9 100644 --- a/src/punching/main/proxy.go +++ b/src/punching/main/proxy/proxy.go @@ -1,5 +1,7 @@ package main -import "punching/proxy" + +import "punching/proxy" + func main() { proxy.Main() } diff --git a/src/punching/main/server.go b/src/punching/main/server/server.go similarity index 78% rename from src/punching/main/server.go rename to src/punching/main/server/server.go index 26f3fa7..075a6b3 100644 --- a/src/punching/main/server.go +++ b/src/punching/main/server/server.go @@ -1,5 +1,7 @@ -package main +package main + import "punching/server" + func main() { server.Main() } diff --git a/src/punching/proxy/config.go b/src/punching/proxy/config.go index 2f95d31..136f208 100644 --- a/src/punching/proxy/config.go +++ b/src/punching/proxy/config.go @@ -1,32 +1,35 @@ -package proxy +package proxy import ( "fmt" + "os" + "punching/util" ) -type ProxyConfig struct{ - Listen string `toml:"listen"` // Proxy 服务的地址 +type ProxyConfig struct { + Listen string `toml:"listen"` // Proxy 服务的地址 } -var Config *ProxyConfig +var Config ProxyConfig -func InitConfig() (err error){ - - if Config == nil { - - // 加载配置信息 - fileName = "proxy.conf" - if err01 := util.DecodeSection(fileName, sectionName, Config); err != nil { - err = fmt.Errorf("Load config file failed, error:%s", err.Error()) - return - } - - if Config.Listen == "" { - err = fmt.Errorf("侦听地址为空,请在配置文件proxy.conf配置listen值") - return - } +func InitConfig() (err error) { + // 加载配置信息 + // fileName := "/Users/chenboxing/nat/src/punching/src/punching/proxy.conf" + fileName := "proxy.conf" + if os.Getenv("PROXY_CONF") != "" { + fileName = os.Getenv("PROXY_CONF") } - - return nil -} \ No newline at end of file + sectionName := "proxy" + if err01 := util.DecodeSection(fileName, sectionName, &Config); err01 != nil { + err = fmt.Errorf("Load config file failed, error:%s", err01.Error()) + return + } + + if Config.Listen == "" { + err = fmt.Errorf("侦听地址为空,请在配置文件proxy.conf配置listen值") + return + } + + return nil +} diff --git a/src/punching/proxy/main.go b/src/punching/proxy/main.go index 69988a9..77d59a8 100644 --- a/src/punching/proxy/main.go +++ b/src/punching/proxy/main.go @@ -1,44 +1,45 @@ -package proxy +package proxy import ( - "net" - "sync" "fmt" - "punching/util" - "time" + + "net" . "punching/constant" - "log" + "punching/logger" + "punching/util" + "sync" + "time" ) // ServerConn 服务端到代理端连接 type ServerConn struct { - Rch chan []byte // 读通道 - Wch chan []byte // 写通道 - Dch chan bool // 连接退 - LocalAddr string // 客户端IP信息 - Pairname string // 匹配名称 - SyncAt int64 // 上次心跳时间 + Rch chan []byte // 读通道 + Wch chan []byte // 写通道 + Dch chan bool // 连接退 + LocalAddr string // 客户端IP信息 + Pairname string // 匹配名称 + SyncAt int64 // 上次心跳时间 } // ClientConn 客户端到代理端连接 -type ClientConn struct{ - Pairname string //匹配码 +type ClientConn struct { + Pairname string //匹配码 } // 全局变量 var ( - OnlineServerList map[string]*ServerConn // 服务端连接列表Map - OnlineClientList map[string]string // 客户端连接列表Map - RWLockClient *sync.RWMutex //读写锁 - RWLockServer *sync.RWMutex + OnlineServerList map[string]*ServerConn // 服务端连接列表Map + OnlineClientList map[string]string // 客户端连接列表Map + RWLockClient *sync.RWMutex //读写锁 + RWLockServer *sync.RWMutex ) -func Main(){ +func Main() { // 加载配置信息 - if err := InitConfig(); err != nil{ - log.Println("加载配置信息出错,原因为:%s", err) - return + if err := InitConfig(); err != nil { + logger.Errorf("加载配置信息出错,原因为:%s", err) + return } OnlineServerList = make(map[string]*ServerConn) @@ -50,16 +51,17 @@ func Main(){ listenAddr := Config.Listen tcpAddr, err := net.ResolveTCPAddr("tcp", listenAddr) if err != nil { + logger.Errorf("Resolved address failed %s", err) panic(err) } listen, err := net.ListenTCP("tcp", tcpAddr) if err != nil { - fmt.Println("监听端口失败:", err.Error()) + logger.Errorf("监听端口失败:%s", err) return } - fmt.Println("已初始化连接,等待客户端连接...") + logger.Infof("已初始化连接,正在侦听:%s, 等待客户端连接...", listenAddr) for { conn, err := listen.AcceptTCP() @@ -67,24 +69,26 @@ func Main(){ fmt.Println("连接异常:", err.Error()) continue } - fmt.Println("本地地址:", conn.LocalAddr().String(), "来自远程地址", conn.RemoteAddr().String()) + logger.Infof("本地地址:%s,来自远程地址:%s", conn.LocalAddr().String(), conn.RemoteAddr().String()) go Handler(conn) } } // processRoleClient 处理客户端连接 -func processRoleClient(conn net.Conn, pairName string ){ +func processRoleClient(conn net.Conn, pairName string) { // 判断匹配的服务端是否已经注册 - RWLockServer.RLock() - serverConn,ok := OnlineServerList[pairName] + RWLockServer.RLock() + + serverConn, ok := OnlineServerList[pairName] + RWLockServer.RUnlock() if !ok { // 客户端没有注册 packErr := util.PackageProxy(PROXY_CONTROL_ERROR_NO_SERVER, []byte("")) conn.Write(packErr) - return + return } // Check the client with the save pair name exists @@ -92,10 +96,10 @@ func processRoleClient(conn net.Conn, pairName string ){ _, ok = OnlineClientList[pairName] RWLockClient.RUnlock() - if !ok{ - packErr := util.PackageProxy(PROXY_CONTROL_ERROR_CLIENT_EXIST,[]byte("")) + if ok { + packErr := util.PackageProxy(PROXY_CONTROL_ERROR_CLIENT_EXIST, []byte("")) conn.Write(packErr) - return + return } // 添加到客户端列表 @@ -104,115 +108,124 @@ func processRoleClient(conn net.Conn, pairName string ){ RWLockClient.Unlock() // 发送Nat地址和接收确认 - toClientAddrs := serverConn.LocalAddr + "," + conn.LocalAddr().String() + toClientAddrs := serverConn.LocalAddr + "," + conn.RemoteAddr().String() pack := util.PackageProxy(PROXY_CONTROL_NORMAL, []byte(toClientAddrs)) conn.Write(pack) - buf := make([]byte, 512) - lenAck, err := conn.Read(buf) + //buf := make([]byte, 512) + //lenAck, err := conn.Read(buf) + // + //if err != nil { + // logger.Errorf("读客户端确认数据出错,%s", err) + // return + //} + // + //ackPack, err01 := util.UnpackageProxy(buf[0:lenAck]) + //if err01 != nil { + // logger.Errorf("包解析出问题") + // return + //} + //flag := 0 + //if ackPack.ControlID == PROXY_CONTROL_ACK { + // flag += 1 + //} - if err != nil { - fmt.Println("读客户端确认数据出错") - return - } - - ackPack := util.UnpackageProxy(buf[0:lenAck]) - flag := 0 - if ackPack.CotnrolID == PROXY_CONTROL_HEARTBITACK { - flag += 1 - } - - toServerAddrs := conn.LocalAddr().String() + "," + serverConn.LocalAddr + toServerAddrs := conn.RemoteAddr().String() + "," + serverConn.LocalAddr addrPack := util.PackageProxy(PROXY_CONTROL_NORMAL, []byte(toServerAddrs)) - serverConn.Wch <- addrPack - // 等待服务端的确认数据 - select { - case bufAck := <- serverConn.Rch: - pack := util.UnpackageProxy(bufAck) - if pack.CotnrolID == PROXY_CONTROL_HEARTBITACK { - flag += 1 - } - break - } + serverConn.Wch <- addrPack + + //// 等待服务端的确认数据 + //select { + //case bufAck := <-serverConn.Rch: + // pack, err02 := util.UnpackageProxy(bufAck) + // if err02 != nil { + // if pack.ControlID == PROXY_CONTROL_ACK { + // flag += 1 + // } + // } + // break + //} + // + //logger.Infof("当前的连接信息为: %d", flag) // 收到服务端的确认数据 - if flag == 2 { - RWLockServer.Lock() - serverConn.Dch <- true // 关闭服务端连接 - delete( OnlineServerList, pairName) - RWLockServer.Unlock() - } - + // if flag == 2 { + RWLockServer.Lock() + //serverConn.Dch <- true // 关闭服务端连接 + delete(OnlineServerList, pairName) + RWLockServer.Unlock() + // } RWLockClient.Lock() - delete( OnlineClientList, pairName) + delete(OnlineClientList, pairName) RWLockClient.Unlock() - - return + + return } // Handle 连接处理函数 func Handler(conn net.Conn) { - defer func() { - if r := recover(); r != nil { - fmt.Printf("连接出现问题:%s",r) - } - }() + //defer func() { + // if err := recover(); err != nil { + // logger.Errorf("连接出现问题:%s", err) + // } + //}() defer conn.Close() buf := make([]byte, 1024) var pairName string - var C *ServerConn - // 确定连接类型,判断是否是有效的连接, - // 对于客户端,需满足 - // 1. 存在对应的服务端 - // 2. 不能存在多个客户端 - // 对于服务端: - // 1. - - for { + i, err := conn.Read(buf) if err != nil { - fmt.Println("读取数据错误:", err.Error()) - return + logger.Errorf("读取数据错误:%s", err) + return } - firstPack := util.UnpackageProxy(buf[0:i]) + firstPack, err01 := util.UnpackageProxy(buf[0:i]) + + if err01 != nil { + logger.Errorf("包格式出错,%s", err01) + return + } + + // Todo 获取时间差距 服务器时间 ticks - 客户端时间 ticks + // 比如说3秒后 clientType := firstPack.Data[0] - var pairName string - if len(firstPack.Data) >1 { + + if len(firstPack.Data) > 1 { pairName = string(firstPack.Data[1:]) } // 处理客户端连接 - if clientType == ROLE_CLIENT{ + if clientType == ROLE_CLIENT { processRoleClient(conn, pairName) - return // 退出客户端连接 + return // 退出客户端连接 } break } // 下面的操作都是针对服务端连接 + logger.Info("处理P2P服务端连接") // 服务端连接允许匹配码为空,系统将随机产生唯一匹配码 - if pairName == ""{ - for{ + if pairName == "" { + for { pairName = util.GenerateRandomPairKey() RWLockServer.Lock() if _, ok := OnlineServerList[pairName]; !ok { - break; + break } RWLockServer.Unlock() } - }else{ + } else { // 是否存在pair name RWLockServer.RLock() @@ -223,49 +236,53 @@ func Handler(conn net.Conn) { if ok { errPack := util.PackageProxy(PROXY_CONTROL_ERROR_SERVER_EXIST, []byte("")) conn.Write(errPack) - fmt.Printf("服务端列表中已存在:%s", pairName) + logger.Errorf("服务端列表中已存在:%s", pairName) return } } + logger.Infof("匹配码为:%s", pairName) // 生成服务器连接对象添加到列表 RWLockServer.Lock() - serverConn := &ServerConn{Rch: make(chan []byte), Wch: make(chan []byte), Pairname: pairName, LocalAddr: conn.LocalAddr().String()} + serverConn := &ServerConn{Rch: make(chan []byte), + Wch: make(chan []byte), + Dch: make(chan bool), + Pairname: pairName, + LocalAddr: conn.RemoteAddr().String()} OnlineServerList[pairName] = serverConn RWLockServer.Unlock() + replyData := conn.RemoteAddr().String() + "," + pairName + replyPack := util.PackageProxy(PROXY_CONTROL_NORMAL, []byte(replyData)) + if _, err := conn.Write(replyPack); err != nil { + logger.Errorf("回复P2P服务端包出错", err) + return + } + // 写通道 go WHandler(conn, serverConn) // 读通道 go RHandler(conn, serverConn) - + // 等待退出通道 select { - case <-C.Dch: - fmt.Println("close handler goroutine") + case <-serverConn.Dch: + logger.Info("close handler goroutine") } } // 正常写数据 匹配端连接上来会写信息 // 定时检测 conn die => goroutine die func WHandler(conn net.Conn, C *ServerConn) { - // 读取业务Work 写入Wch的数据 - ticker := time.NewTicker(60 * time.Second) + for { select { case d := <-C.Wch: + logger.Info("通道接收到数据,准备写") conn.Write(d) - case <-ticker.C: //60秒无操作,可能连接已中断 - RWLockServer.RLock() - _, ok := OnlineServerList[C.Pairname]; - RWLockServer.RUnlock() - if !ok { - fmt.Println("conn die, close WHandler") - return - } } } } @@ -279,16 +296,20 @@ func RHandler(conn net.Conn, C *ServerConn) { for { data := make([]byte, 128) // 设置读超时 - err := conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + err := conn.SetReadDeadline(time.Now().Add(2 * time.Second)) if err != nil { - fmt.Println(err) + logger.Errorf("设置读超时失败,%s", err) } if i, derr := conn.Read(data); derr == nil { // 可能是来自客户端的消息确认 // 数据消息 - pack := util.UnpackageProxy(data[0:i]) - if pack.CotnrolID == PROXY_CONTROL_HEARTBITACK { - fmt.Println("Received hartbeat ack")//// C.Rch <- data + pack, err01 := util.UnpackageProxy(data[0:i]) + if err01 != nil { + logger.Errorf("包无法解析,%s", err01) + continue + } + if pack.ControlID == PROXY_CONTROL_HEARTBITACK { + logger.Info("Received hartbeat ack") //// C.Rch <- data } continue @@ -298,28 +319,25 @@ func RHandler(conn net.Conn, C *ServerConn) { // 写心跳包 heartPack := util.PackageProxy(PROXY_CONTROL_HEARTBIT, []byte("")) conn.Write(heartPack) - - conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) if _, herr := conn.Read(data); herr == nil { - + // fmt.Println(string(data)) // 更新心跳时间 RWLockServer.RLock() - serverConn, ok := OnlineServerList[C.Pairname]; + serverConn, ok := OnlineServerList[C.Pairname] if ok { serverConn.SyncAt = time.Now().Unix() } RWLockServer.RUnlock() - fmt.Println("resv ht packet ack") } else { + logger.Errorf("读取连接出错,%s", herr) RWLockServer.Lock() delete(OnlineServerList, C.Pairname) RWLockServer.Unlock() - fmt.Println("delete user!") + logger.Infof("删除在线P2P服务端,%s", C.Pairname) return } } } - - diff --git a/src/punching/proxy/main_test.go b/src/punching/proxy/main_test.go new file mode 100644 index 0000000..1dcd4b0 --- /dev/null +++ b/src/punching/proxy/main_test.go @@ -0,0 +1,129 @@ +package proxy_test + +import ( + "net" + "punching/client" + + "io" + // "io/ioutil" + "net/http" + "os" + "punching/logger" + "punching/proxy" + "punching/server" + "testing" + "time" +) + +// var WG sync.WaitGroup + +const ( + RENDER_FILE_PATH = "/Users/chenboxing/nat/src/punching/index.html" +) + +func runHttpWeb(addr string) { + + //第一个参数为客户端发起http请求时的接口名,第二个参数是一个func,负责处理这个请求。 + http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { + + f, _ := os.Open(RENDER_FILE_PATH) + defer f.Close() + //读取页面内容 + io.Copy(w, f) + }) + + //服务器要监听的主机地址和端口号 + err := http.ListenAndServe(addr, nil) + + if err != nil { + logger.Errorf("ListenAndServe error: ", err.Error()) + } else { + logger.Infof("开启服务在:%s", addr) + } + +} + +func TestNat(t *testing.T) { + + // 开启Proxy服务 + go func() { + proxy.Main() + }() + + for { + time.Sleep(2 * time.Second) + conn, err := net.Dial("tcp", proxy.Config.Listen) + if err != nil { + time.Sleep(2 * time.Second) + } else { + conn.Close() + break + } + } + + // Server连接 + go func() { + server.Main() + }() + + server.InitConfig() + pairName := server.Config.Key + + logger.Infof("Pairname is :%s, %+v", pairName, server.Config) + // Check if the P2P server is available + for { + logger.Info(len(proxy.OnlineServerList)) + if _, ok := proxy.OnlineServerList[pairName]; !ok { + time.Sleep(2 * time.Second) + continue + } + break + } + + logger.Info("准备开启客户端") + + go func() { + client.Main() + }() + + //client.InitConfig() + //for { + // time.Sleep(2 * time.Second) + // logger.Infof("准备连接:%s", client.Config.Listen) + // conn, err := net.Dial("tcp", client.Config.Listen) + // if err != nil { + // time.Sleep(2 * time.Second) + // } else { + // conn.Close() + // break + // } + //} + // + //// 启用Web后台服务 + //server.Config.Dial = ":7779" + // + //logger.Infof("开启后台服务:%s",server.Config.Dial) + ////开启服务 + //go runHttpWeb(server.Config.Dial) + // + //url := "http://" + client.Config.Listen + //logger.Infof("获取网页内容:") + // + //if resp, err := http.Get(url); err != nil { + // t.Errorf("读取配置端口出错,%s", client.Config.Listen) + //} else { + // defer resp.Body.Close() + // arrContent, _ := ioutil.ReadAll(resp.Body) + // + // if all, err := ioutil.ReadFile(RENDER_FILE_PATH); err != nil { + // t.Errorf("读取文件出现错误, %s", err) + // } else { + // if len(all) != len(arrContent) { + // t.Errorf("文件大小不一致:%d,%d", len(all), len(arrContent)) + // } + // } + // + //} + + select {} +} diff --git a/src/punching/server.conf b/src/punching/server.conf index 0009188..694004f 100644 --- a/src/punching/server.conf +++ b/src/punching/server.conf @@ -1,7 +1,7 @@ [server] -proxy = "" -dial = "" -key = "" +proxy = "127.0.0.1:7777" +dial = "192.168.3.19:3389" +key = "ABCD" [ThirdProxy] address = "proxy.move8.cn:7777" diff --git a/src/punching/server/backend.go b/src/punching/server/backend.go index c3a594e..988f107 100644 --- a/src/punching/server/backend.go +++ b/src/punching/server/backend.go @@ -1,19 +1,18 @@ package server import ( - "log" "net" "os" - "sync" - "punching/util" . "punching/constant" + "punching/logger" + "punching/util" + "sync" ) var ExitChanMap map[string]chan bool var RWLock *sync.RWMutex var DialTargetMap map[string]net.Conn - func handleServerConn() { DialTargetMap = make(map[string]net.Conn) @@ -27,10 +26,8 @@ func handleServerConn() { //确定target是否存在,如果不存在,重新生成target - - controlID := pack.ControlID - sessionID := string(pack.SessionID) + sessionID := pack.SessionID data := pack.Data //log.Println("读取Nat接收包:handleReadConn", string(r[0:34]), "长度为", len(r)) @@ -40,32 +37,30 @@ func handleServerConn() { RWLock.RLock() if c, ok := ExitChanMap[sessionID]; ok { - log.Println("发送退出信号") + logger.Info("发送退出信号") c <- true } else { - log.Println("在ExitChanMap里找不到Key为:", sessionID) + logger.Errorf("在ExitChanMap里找不到Key为:%s", sessionID) } RWLock.RUnlock() break } //第一次 - if controlID == PAIR_CONTROL_FIRST { - log.Println("准备连接:", targetAddr) + if controlID == PAIR_CONTROL_FIRST { + logger.Info("准备连接:", targetAddr) target, err := net.Dial("tcp", targetAddr) if err != nil { - log.Println("连接目标出错", targetAddr) + logger.Errorf("连接目标出错:%s", targetAddr) break } ExitChanMap[sessionID] = make(chan bool) DialTargetMap[sessionID] = target - log.Println("连接目标成功:", targetAddr) - - _, err2 := target.Write(pack) + _, err2 := target.Write(pack.Data) if err2 != nil { - log.Println("连接成功后写目标出错", err2.Error()) + logger.Errorf("连接成功后写目标出错,%s", err2) break } go ReadFromTarget(target, sessionID) @@ -74,35 +69,36 @@ func handleServerConn() { if dialtarget, ok := DialTargetMap[sessionID]; ok { len2, err2 := dialtarget.Write(data) - log.Println("已写入:", len2) + logger.Info("已写入:", len2) if err2 != nil { - log.Println("写目标出错", targetAddr, err2.Error()) + logger.Errorf("写目标:%s,出错:%s", targetAddr, err2) //发送控制 - quitPack := util.PackageNat(PAIR_CONTROL_QUIT, [4]byte(sessionID),[]byte("")) + quitPack := util.PackageNat(PAIR_CONTROL_QUIT, sessionID, []byte("")) Wch <- quitPack break } } else { - log.Println("找不到目标Dial:") + logger.Errorf("找不到目标Dial:%s", sessionID) } } case <-Dch: //出错 - os.Exit(3) + logger.Warn("收到退出信息") + os.Exit(1) } } } // 读取目标流到源 func ReadFromTarget(target net.Conn, sessionID string) { + defer func() { target.Close() - RWLock.Lock() delete(DialTargetMap, sessionID) delete(ExitChanMap, sessionID) @@ -117,15 +113,14 @@ func ReadFromTarget(target net.Conn, sessionID string) { j, err := target.Read(buf) if err != nil || j == 0 { - log.Println("读取目标连接数据出错,原因为:", err.Error()) + logger.Errorf("读取目标连接数据出错,原因为:%s", err) - pack := util.PackageNat(PAIR_CONTROL_QUIT, [4]byte(sessionID),[]byte("")) + pack := util.PackageNat(PAIR_CONTROL_QUIT, sessionID, []byte("")) Wch <- pack - return } - - pack := util.PackageNat(PAIR_CONTROL_NORMAL,[4]byte(sessionID), buf[0:j]) + logger.Info("准备构造数据") + pack := util.PackageNat(PAIR_CONTROL_NORMAL, sessionID, buf[0:j]) Wch <- pack @@ -135,7 +130,7 @@ func ReadFromTarget(target net.Conn, sessionID string) { //接受到退出标识 select { case <-ExitChanMap[sessionID]: - log.Println("需要退出Accept") + logger.Warn("需要退出Accept") return } diff --git a/src/punching/server/config.go b/src/punching/server/config.go index 44dc301..7758acb 100644 --- a/src/punching/server/config.go +++ b/src/punching/server/config.go @@ -2,6 +2,7 @@ package server import ( "fmt" + "os" "punching/util" ) @@ -21,36 +22,35 @@ type ServerConfig struct { Key string `toml:"key"` // 客户端和服务端的匹配码 } -type ThridProxyConfig struct { +type ThirdProxyConfig struct { Address string `toml:"address"` // Proxy 服务的地址 } -var Config *ServerConfig -var ThirdConfig *ThridProxyConfig +var Config ServerConfig +var ThirdConfig ThirdProxyConfig func InitConfig() (err error) { - if Config == nil { + // 加载配置信息 + fileName := "server.conf" + if os.Getenv("SERVER_CONF") != "" { + fileName = os.Getenv("SERVER_CONF") + } + sectionName1 := "server" + if err01 := util.DecodeSection(fileName, sectionName1, &Config); err01 != nil { + err = fmt.Errorf("Load config file failed, error:%s", err01.Error()) + return + } - // 加载配置信息 - fileName := "server.conf" - sectionName1 := "server" - if err01 := util.DecodeSection(fileName, sectionName1, Config); err01 != nil { - err = fmt.Errorf("Load config file failed, error:%s", err01.Error()) - return - } - - sectionName2 := "ThridProxy" - if err02 := util.DecodeSection(fileName, sectionName2, ThirdConfig); err != nil { - err = fmt.Errorf("Load config file failed, error:%s", err02.Error()) - return - } - - if Config.Proxy == "" && ThirdConfig.Address == "" { - err = fmt.Errorf("Proxy服务地址和第三方Proxy服务地址不能同时为空") - return - } + sectionName2 := "ThirdProxy" + if err02 := util.DecodeSection(fileName, sectionName2, &ThirdConfig); err != nil { + err = fmt.Errorf("Load config file failed, error:%s", err02.Error()) + return + } + if Config.Proxy == "" && ThirdConfig.Address == "" { + err = fmt.Errorf("Proxy服务地址和第三方Proxy服务地址不能同时为空") + return } return nil diff --git a/src/punching/server/main.go b/src/punching/server/main.go index 1ad2cab..e441c8a 100644 --- a/src/punching/server/main.go +++ b/src/punching/server/main.go @@ -1,64 +1,63 @@ package server import ( - "fmt" - "log" + "punching/logger" "punching/util" "time" ) - var ( Dch chan bool Rch chan util.PairPackage Wch chan []byte ) - func Main() { // 加载配置信息 if err := InitConfig(); err != nil { - fmt.Println("加载配置信息出错,原因为:%s", err) + logger.Errorf("加载配置信息出错,原因为:%s\n", err) return } // Proxy Server Address - proxyAddr := Config.Dial + proxyAddr := Config.Proxy if proxyAddr == "" { proxyAddr = ThirdConfig.Address } pairName := Config.Key - var connPeer util.NetConn - + var connPeer util.NetConn // 如果跟Peer连接出错,要重新连接Proxy for { + logger.Infof("准备连接Proxy:%s", proxyAddr) conn, err := ServerDialProxy(proxyAddr, pairName) if err != nil { - log.Println(err) + logger.Errorf("连接到Proxy出现错误,", err) time.Sleep(5 * time.Second) continue } - localAddr, remoteAddr, _, errWait := WaitForPeer(conn) + localAddr, remoteAddr, errWait := WaitForPeer(conn) + if errWait != nil { - log.Println(errWait) + logger.Errorf("服务端在等待P2P客户端连入出错,原因为:", errWait) time.Sleep(5 * time.Second) continue } + logger.Infof("服务端:本地地址:%s,对方地址:%s,准备连接", localAddr, remoteAddr) //连接对方 var errPeer error connPeer, errPeer = util.DialPeer(localAddr, remoteAddr) if errPeer != nil { //无法连接上 - log.Println(errPeer) + logger.Errorf("无法连接对方,本地地址:%s,远程地址:%s,错误:%s", localAddr, remoteAddr, errPeer) continue } - + //已经连接上 // 连接要开启的服务 @@ -66,23 +65,21 @@ func Main() { Rch = make(chan util.PairPackage) Wch = make(chan []byte) - go RHandler(connPeer) //Nat端写通道 - go WHandler(connPeer) //Nat端读通道 + go RHandler(connPeer) //Nat端写通道 + go WHandler(connPeer) //Nat端读通道 + + // 转发到提供服务端口,并将服务端口数据转到Nat端 + handleServerConn() // 如果P2P端通讯出错,退出 select { case <-Dch: continue } - - } - - } - func RHandler(conn util.NetConn) { //声明一个临时缓冲区,用来存储被截断的数据 @@ -92,11 +89,11 @@ func RHandler(conn util.NetConn) { for { j, err := conn.Read(buff) if err != nil { - log.Println("读取连接数据出错,原因为:", err.Error()) + logger.Errorf("读取连接数据出错,原因为:", err) Dch <- true break } - log.Println("准备解包数据:", j) + logger.Info("准备解包数据:", j) // 解包 tmpBuffer = util.UnpackageNat(append(tmpBuffer, buff[:j]...), Rch) } @@ -108,12 +105,11 @@ func WHandler(conn util.NetConn) { case msg := <-Wch: l, err := conn.Write(msg) if err != nil { - log.Println("写到Nat目录连接出错:", err.Error()) + logger.Errorf("写到Nat目录连接出错:", err) Dch <- true } else { - log.Println(time.Now().UnixNano(), "已写入到Nat:", l) + logger.Info(time.Now().UnixNano(), "已写入到Nat:", l) } - // } } } diff --git a/src/punching/server/proxy.go b/src/punching/server/proxy.go index 719b27a..3859c0f 100644 --- a/src/punching/server/proxy.go +++ b/src/punching/server/proxy.go @@ -2,84 +2,93 @@ package server import ( "fmt" - "log" + + "errors" "net" - "punching/util" . "punching/constant" + "punching/logger" + "punching/util" "strings" ) + var ( ProxyDch chan util.ProxyPackage ProxyRch chan []byte ProxyWch chan []byte ) - - // 等待,直到P2P客户端连入,在此期间会一直接受Proxy解析端发来的心跳包 -func WaitForPeer(conn util.NetConn) (localAddr string, remoteAddr string, pairName string, err error) { - +func WaitForPeer(conn *util.NetConn) (localAddr string, remoteAddr string, err error) { defer conn.Close() + logger.Infof("Enter WaitForPeer") // 接收心跳和客户端连接确认 ProxyRch = make(chan []byte) ProxyWch = make(chan []byte) + ProxyDch = make(chan util.ProxyPackage) + go RProxyHandler(conn) select { case pack := <-ProxyDch: - switch pack.CotnrolID { + + switch pack.ControlID { case PROXY_CONTROL_QUIT: //无法跟proxy连接 //关闭连接 - err = error("出现错误") - break + logger.Error("收到退出包") + err = fmt.Errorf("收到退出") + return case PROXY_CONTROL_NORMAL: // 获取客户端发来信息 + logger.Info("读取到客户端发来的信息包") data := pack.Data str := string(data) - parts := strings.Split(str,",") - localAddr = parts[0] - remoteAddr = parts[1] - pairName = parts[2] - break + parts := strings.Split(str, ",") + remoteAddr = parts[0] + localAddr = parts[1] + return } - return } + + return } // ServerDialProxy P2P服务端连接Proxy解析 -func ServerDialProxy(proxyAddr string, pairName string) (retConn util.NetConn, err error) { +func ServerDialProxy(proxyAddr string, pairName string) (retConn *util.NetConn, err error) { - var conn util.NetConn + var conn = &util.NetConn{} // 不指定端口,让系统自动分配 - err = conn.Bind("tcp", "") + err = conn.Bind("") if err != nil { - log.Println("绑定出错", err.Error()) + logger.Errorf("绑定出错%s", err) return } // 连接到Proxy解析服务器 tcpAddr, err := net.ResolveTCPAddr("tcp", proxyAddr) + err = conn.Connect(util.InetAddr(tcpAddr.IP.String()), tcpAddr.Port) if err != nil { + logger.Errorf("连接服务出错:%s", proxyAddr) fmt.Println("连接服务端出错", err.Error()) return } - fmt.Println("已连接服务器,服务器地址是:%s:%d", tcpAddr.IP.String(), tcpAddr.Port) - + logger.Infof("已连接服务器,服务器地址是:%s:%d", tcpAddr.IP.String(), tcpAddr.Port) // 构造自定义包 - data := make([]byte, 4) + data := make([]byte, 0) data = append(data, []byte{ROLE_SERVER}...) data = append(data, []byte(pairName)...) packFirst := util.PackageProxy(PROXY_CONTROL_FIRST, data) _, err = conn.Write(packFirst) + if err != nil { + logger.Errorf("写入Proxy连接出错,%s", err) return } @@ -88,7 +97,7 @@ func ServerDialProxy(proxyAddr string, pairName string) (retConn util.NetConn, e // 获取返回信息 i, err := conn.Read(buff) if err != nil { - fmt.Println("读取数据出错,", err.Error()) + logger.Errorf("读取数据出错,%s", err) return } @@ -100,47 +109,52 @@ func ServerDialProxy(proxyAddr string, pairName string) (retConn util.NetConn, e localAddr := items[0] rePairName := items[1] - fmt.Printf("P2P服务端侦听地址为:%s, 匹配码为:%s", localAddr, rePairName) - + logger.Infof("P2P服务端侦听地址为:%s, 匹配码为:%s", localAddr, rePairName) break case PROXY_CONTROL_ERROR_SERVER_EXIST: - err = fmt.Errorf("错误,P2P服务端已存在") + logger.Error("错误,P2P服务端已存在") + err = errors.New("错误,P2P服务端已存在") break default: - err = fmt.Errorf("无效的控制码,%d",int(controlID)) + err = fmt.Errorf("无效的控制码,%d", int(controlID)) } - return conn, nil + return conn, err } -func RProxyHandler(conn util.NetConn) { +func RProxyHandler(conn *util.NetConn) { for { // 心跳包,回复ack data := make([]byte, 512) - i, _ := conn.Read(data) - if i == 0 { - ProxyDch <- util.ProxyPackage{CotnrolID:PROXY_CONTROL_QUIT} + i, err0 := conn.Read(data) + if err0 != nil { + logger.Errorf("读取Proxy连接出错,%s", err0) + ProxyDch <- util.ProxyPackage{ControlID: PROXY_CONTROL_QUIT} return } // Invalid package - pack := util.UnpackageProxy(data[0:i]) - if pack.Head != PROXY_PACKAGE_HEAD { - ProxyDch <- util.ProxyPackage{CotnrolID:PROXY_CONTROL_QUIT} + pack, err := util.UnpackageProxy(data[0:i]) + if err != nil { + logger.Errorf("解包错误,%s", err) + ProxyDch <- util.ProxyPackage{ControlID: PROXY_CONTROL_QUIT} return } - if pack.CotnrolID == PROXY_CONTROL_HEARTBIT { + if pack.ControlID == PROXY_CONTROL_HEARTBIT { // Received heartbeat package // 确认 ackPack := util.PackageProxy(PROXY_CONTROL_HEARTBITACK, []byte("")) conn.Write(ackPack) - } + } else { - ProxyDch <- pack + ProxyDch <- pack + return + + } } diff --git a/src/punching/util/configuration.go b/src/punching/util/configuration.go index 060a6b6..861aa21 100644 --- a/src/punching/util/configuration.go +++ b/src/punching/util/configuration.go @@ -1,14 +1,13 @@ package util import ( - + "fmt" "github.com/BurntSushi/toml" "os" - "fmt" ) // LoadTomlFile 加载配置文件 -func LoadTomlFile(fileName string)(sections map[string]toml.Primitive, m toml.MetaData, err error){ +func LoadTomlFile(fileName string) (sections map[string]toml.Primitive, m toml.MetaData, err error) { // 判断配置文件是否存在 if _, err = os.Stat(fileName); err != nil { if os.IsNotExist(err) { @@ -16,32 +15,30 @@ func LoadTomlFile(fileName string)(sections map[string]toml.Primitive, m toml.Me } else { err = fmt.Errorf("configuration file %s execption:%s\r\n", fileName, err.Error()) } - return + return } - - // 加载配置文件 var file toml.Primitive var meta toml.MetaData - if meta, err = toml.DecodeFile(fileName, &file); err != nil { - err = fmt.Errorf("load configuration file %s failed:%s", fileName, err.Error()) - }else{ + if meta, err = toml.DecodeFile(fileName, &file); err != nil { + err = fmt.Errorf("load configuration file %s failed:%s", fileName, err.Error()) + } else { - err = meta.PrimitiveDecode(file, §ions) - } + err = meta.PrimitiveDecode(file, §ions) + } m = meta - return + return } // DecodeSection 解码一个节点的配置信息 func DecodeSection(filename, name string, v interface{}) (err error) { sections, meta, err := LoadTomlFile(filename) - if err != nil{ - return + if err != nil { + return } if section, ok := sections[name]; ok { diff --git a/src/punching/util/net.go b/src/punching/util/net.go index a1e1dad..83d8a09 100644 --- a/src/punching/util/net.go +++ b/src/punching/util/net.go @@ -2,28 +2,22 @@ package util import ( "fmt" - "log" "net" - "time" . "punching/constant" + "punching/logger" "strings" ) -// 连接到Proxy代理解析端, 地址/类型/IP -// 发送第一个包->(接收到错误数据,退出,否则接收到NatAddress,->确认收到 <->完成 不断接收心跳包,确认) -// 客户端 错误失败 | nat地址 -// 服务端 错误失败 | 注册 心跑包 - -// ClientDialProxy P2P客户端连接到Proxy +// ClientDialProxy P2P客户端连接到Proxy端 // 连接成功后获取本地地址,远程地址和匹配码,否则将返回错误 -func ClientDialProxy(proxyAddr string, pairName string) (localAddr string, remoteAddr string, rePairName string, err error) { +func ClientDialProxy(proxyAddr string, pairName string) (localAddr string, remoteAddr string, err error) { var conn = NetConn{} // 不指定端口,让系统自动分配 - err = conn.Bind("tcp", "") + err = conn.Bind("") if err != nil { - fmt.Println("绑定出错", err.Error()) + logger.Errorf("绑定出错,%s", err) return } @@ -32,14 +26,14 @@ func ClientDialProxy(proxyAddr string, pairName string) (localAddr string, remot err = conn.Connect(InetAddr(tcpAddr.IP.String()), tcpAddr.Port) if err != nil { - fmt.Println("连接服务端出错", err.Error()) + logger.Errorf("连接服务端出错,%s", err) return } defer conn.Close() - fmt.Println("已连接服务器,服务器地址是:%s:%d", tcpAddr.IP.String(), tcpAddr.Port) + logger.Infof("已连接服务器,服务器地址是:%s:%d", tcpAddr.IP.String(), tcpAddr.Port) // 构造自定义包 - data := make([]byte, 4) + data := make([]byte, 0) data = append(data, []byte{ROLE_CLIENT}...) data = append(data, []byte(pairName)...) @@ -47,6 +41,7 @@ func ClientDialProxy(proxyAddr string, pairName string) (localAddr string, remot _, err = conn.Write(packFirst) if err != nil { + logger.Errorf("写入Proxy连接出错,%s", err) return } @@ -55,7 +50,7 @@ func ClientDialProxy(proxyAddr string, pairName string) (localAddr string, remot // 获取返回信息 i, err := conn.Read(buff) if err != nil { - fmt.Println("读取数据出错,", err.Error()) + logger.Errorf("读取数据出错,%s", err) return } @@ -64,22 +59,21 @@ func ClientDialProxy(proxyAddr string, pairName string) (localAddr string, remot case PROXY_CONTROL_NORMAL: retData := string(buff[2:i]) items := strings.Split(retData, ",") - localAddr = items[0] - remoteAddr = items[1] - rePairName = items[2] + remoteAddr = items[0] + localAddr = items[1] - // 发送确认 - packAck := PackageProxy(PROXY_CONTROL_ACK, []byte("")) - conn.Write(packAck) - break + //// 发送确认 + //packAck := PackageProxy(PROXY_CONTROL_ACK, []byte("")) + //conn.Write(packAck) + return case PROXY_CONTROL_ERROR_NO_SERVER: err = fmt.Errorf("错误,P2P服务端不存在") break case PROXY_CONTROL_ERROR_CLIENT_EXIST: - err = fmt.Errorf("错误,P2P服务端不存在") + err = fmt.Errorf("错误,P2P客户端已存在") break default: - err = fmt.Errorf("无效的控制码,%d",int(controlID)) + err = fmt.Errorf("无效的控制码,%d", int(controlID)) } return @@ -89,40 +83,37 @@ func DialPeer(localAddr string, remoteAddr string) (netconn NetConn, err error) remoteTCPAddr, err := net.ResolveTCPAddr("tcp", remoteAddr) if err != nil { - log.Println("The format of remote address is invalid, %s", err.Error()) + logger.Errorf("The format of remote address is invalid, %s", err) return } - var conn NetConn // 不指定端口,让系统自动分配 - err = conn.Bind("tcp", localAddr) + err = conn.Bind(localAddr) if err != nil { - log.Println("绑定出错", err.Error()) + logger.Errorf("绑定出错,%s", err) return } - log.Println("远程地址是:", remoteTCPAddr.IP.String(), remoteTCPAddr.Port) - // 有时连接一次并不成功,尝试多次连接 - tryCount := 0 - for { - tryCount += 1 - - if tryCount > 10 { - err = fmt.Errorf("Attempt to connect remote address, but failed, local addrss: %s, "+ - "remote address", localAddr, remoteAddr) - return - } - err02 := conn.Connect(InetAddr(remoteTCPAddr.IP.String()), remoteTCPAddr.Port) - if err02 != nil { - log.Printf("第%d次不能连接远程服务器:%s", tryCount, err02.Error()) - time.Sleep(1 * time.Second) - continue - } else { - log.Println("已经连接到peer: ", remoteTCPAddr.String()) - break - } + //tryCount := 0 + //for { + // tryCount += 1 + // + // if tryCount > 10 { + // err = fmt.Errorf("Attempt to connect remote address, but failed, local addrss: %s, "+ + // "remote address:%s", localAddr, remoteAddr) + // return + // } + err = conn.Connect(InetAddr(remoteTCPAddr.IP.String()), remoteTCPAddr.Port) + if err != nil { + logger.Warnf("第次不能连接远程服务器:%s", err) + //time.Sleep(1 * time.Second) + //continue + } else { + logger.Infof("已经连接到peer: %s", remoteTCPAddr.String()) + //break } - return conn, nil + //} + return conn, err } diff --git a/src/punching/util/netconn_darwin.go b/src/punching/util/netconn_darwin.go index 6a4c814..8b25646 100644 --- a/src/punching/util/netconn_darwin.go +++ b/src/punching/util/netconn_darwin.go @@ -1,34 +1,37 @@ package util import ( + "errors" "fmt" "log" "net" "os" + + "punching/logger" "syscall" "time" ) type NetConn struct { - fd int // 文件句柄 - conn *net.Conn // 连接对象 + fd int // 文件句柄 + conn net.Conn // 连接对象 } func (hole *NetConn) Close() { - if hole.conn != nil { - hole.conn.Close() - } + + //if hole.conn != nil { + logger.Info("断开连接") + hole.conn.Close() + //} } -func (hole *NetConn) Bind(proto, addr string) (err error) { +func (hole *NetConn) Bind(addr string) (err error) { - if "tcp" != proto { - log.Println("tcp != proto") - return - } + proto := "tcp" syscall.ForkLock.RLock() + var fd int if fd, err = syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP); err != nil { syscall.ForkLock.RUnlock() return @@ -66,7 +69,8 @@ func (hole *NetConn) Bind(proto, addr string) (err error) { func (hole *NetConn) Connect(addr [4]byte, port int) (err error) { if hole.fd == 0 { - err = error{"请先调用Bind()函数"} + + err = errors.New("请先调用Bind()函数") return } @@ -76,13 +80,14 @@ func (hole *NetConn) Connect(addr [4]byte, port int) (err error) { } chConnect := make(chan error) + logger.Info(time.Now().UnixNano(), "准备连接对方") go func() { - err = syscall.Connect(fd, &addrInet4) + err = syscall.Connect(hole.fd, &addrInet4) chConnect <- err }() //有时候连接被远端抛弃的时候, syscall.Connect() 会很久才返回 - ticker := time.NewTicker(60 * time.Second) + ticker := time.NewTicker(10 * time.Second) select { case <-ticker.C: err = fmt.Errorf("Connect timeout") @@ -90,20 +95,20 @@ func (hole *NetConn) Connect(addr [4]byte, port int) (err error) { case e := <-chConnect: if e != nil { err = e - log.Println("Connect error: ", err) + logger.Errorf("Connect error: %s", err) return } } // 转为net.conn对象 var file *os.File - file = os.NewFile(uintptr(fd), fmt.Sprintf("tcpholepunching.%d", time.Now().UnixNano())) + file = os.NewFile(uintptr(hole.fd), fmt.Sprintf("tcpholepunching.%d", time.Now().UnixNano())) if conn0, err0 := net.FileConn(file); err0 != nil { log.Println("Connect error", err0) err = err0 return } else { - hole.conn = &conn0 + hole.conn = conn0 } if err = file.Close(); err != nil { @@ -116,10 +121,10 @@ func (hole *NetConn) Connect(addr [4]byte, port int) (err error) { func (hole *NetConn) Read(buffer []byte) (length int, err error) { - return hole.Read(buffer) + return hole.conn.Read(buffer) } func (hole *NetConn) Write(data []byte) (length int, err error) { - return hole.Write(data) + return hole.conn.Write(data) } diff --git a/src/punching/util/netconn_linux.go b/src/punching/util/netconn_linux.go new file mode 100644 index 0000000..4acb919 --- /dev/null +++ b/src/punching/util/netconn_linux.go @@ -0,0 +1,126 @@ +package util + +import ( + "fmt" + "log" + "net" + "os" + "punching/logger" + "syscall" + "time" +) + +type NetConn struct { + fd int // 文件句柄 + conn net.Conn // 连接对象 +} + +func (hole *NetConn) Close() { + //if hole.conn != nil { + hole.conn.Close() + //} + +} + +func (hole *NetConn) Bind(addr string) (err error) { + + proto := "tcp" + + syscall.ForkLock.RLock() + var fd int + if fd, err = syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP); err != nil { + syscall.ForkLock.RUnlock() + return + } + syscall.ForkLock.RUnlock() + + defer func() { + if err != nil { + syscall.Close(fd) + } + }() + + if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1); err != nil { + return + } + + if len(addr) > 0 { + var tcp *net.TCPAddr + tcp, err = net.ResolveTCPAddr(proto, addr) + if err != nil && tcp.IP != nil { + log.Println(err) + return + } + sockaddr := &syscall.SockaddrInet4{Port: tcp.Port} + if err = syscall.Bind(fd, sockaddr); err != nil { + return + } + } + + hole.fd = fd + + return +} + +func (hole *NetConn) Connect(addr [4]byte, port int) (err error) { + + if hole.fd == 0 { + + err = fmt.Errorf("请先调用Bind()函数") + return + } + + addrInet4 := syscall.SockaddrInet4{ + Addr: addr, + Port: port, + } + + chConnect := make(chan error) + logger.Info(time.Now().UnixNano(), "准备连接对方") + go func() { + err = syscall.Connect(hole.fd, &addrInet4) + chConnect <- err + }() + + //有时候连接被远端抛弃的时候, syscall.Connect() 会很久才返回 + ticker := time.NewTicker(60 * time.Second) + select { + case <-ticker.C: + err = fmt.Errorf("Connect timeout") + return + case e := <-chConnect: + if e != nil { + err = e + logger.Errorf("Connect error: %s", err) + return + } + } + + // 转为net.conn对象 + var file *os.File + file = os.NewFile(uintptr(hole.fd), fmt.Sprintf("tcpholepunching.%d", time.Now().UnixNano())) + if conn0, err0 := net.FileConn(file); err0 != nil { + log.Println("Connect error", err0) + err = err0 + return + } else { + hole.conn = conn0 + } + + if err = file.Close(); err != nil { + log.Println("Connect error", err) + return + } + return + +} + +func (hole *NetConn) Read(buffer []byte) (length int, err error) { + + return hole.conn.Read(buffer) +} + +func (hole *NetConn) Write(data []byte) (length int, err error) { + + return hole.conn.Write(data) +} diff --git a/src/punching/util/netconn_test.go b/src/punching/util/netconn_test.go new file mode 100644 index 0000000..010dd61 --- /dev/null +++ b/src/punching/util/netconn_test.go @@ -0,0 +1,19 @@ +package util_test + +//import ( +// "testing" +// "punching/util" +//) + +//func TestNetConn(t *testing.T){ +// var conn util.NetConn +// defer conn.Close() +// if err := conn.Bind("tcp", ""); err != nil{ +// t.Errorf("绑定出错,%s", err.Error()) +// } +// if err := conn.Connect(util.InetAddr("211.102.90.92"), 22); err != nil{ +// t.Errorf("绑定出错,%s", err.Error()) +// } +// +// +//} diff --git a/src/punching/util/netconn_windows.go b/src/punching/util/netconn_windows.go index fcd1c6b..f729d09 100644 --- a/src/punching/util/netconn_windows.go +++ b/src/punching/util/netconn_windows.go @@ -4,23 +4,20 @@ import ( "fmt" "log" "net" + "punching/logger" "syscall" "time" ) - func MAKEWORD(low, high uint8) uint32 { var ret uint16 = uint16(high)<<8 + uint16(low) return uint32(ret) } - type NetConn struct { sock syscall.Handle } - - func (hole *NetConn) Close() { syscall.WSACleanup() @@ -28,13 +25,9 @@ func (hole *NetConn) Close() { } -func (hole *NetConn) Bind(proto, addr string) (err error) { +func (hole *NetConn) Bind(addr string) (err error) { - if "tcp" != proto { - - log.Println("tcp != proto") - return - } + proto := "tcp" var wsadata syscall.WSAData @@ -80,7 +73,7 @@ func (hole *NetConn) Bind(proto, addr string) (err error) { func (hole *NetConn) Connect(addr [4]byte, port int) (err error) { if hole.sock == 0 { - err = Error{"请先执行Bind()"} + err = fmt.Errorf("请先执行Bind()") return } addrInet4 := syscall.SockaddrInet4{ @@ -89,6 +82,7 @@ func (hole *NetConn) Connect(addr [4]byte, port int) (err error) { } chConnect := make(chan error) + logger.Info(time.Now().UnixNano(), "准备连接对方") go func() { err = syscall.Connect(hole.sock, &addrInet4) chConnect <- err diff --git a/src/punching/util/package.go b/src/punching/util/package.go index 30e71c4..54d3607 100644 --- a/src/punching/util/package.go +++ b/src/punching/util/package.go @@ -2,25 +2,26 @@ package util import ( "bytes" + "errors" . "punching/constant" - "log" ) // PairPackage P2P端通讯封装包 type PairPackage struct { - Head [6]byte // 头 - ControlID byte // 控制ID - SessionID [4]byte // 会话ID - Data []byte // 数据 + Head string // 头 6位字符 + ControlID byte // 控制ID + SessionID string // 会话ID 4位字符 + Data []byte // 数据 } +// 代理解析端通讯包 type ProxyPackage struct { - Head byte // 头 - CotnrolID byte // 控制ID - Data []byte // 数据 + Head byte // 头 + ControlID byte // 控制ID + Data []byte // 数据 } -// 跟代理解析端通讯封包 +// 跟代理解析端通讯封包 func PackageProxy(control byte, data []byte) []byte { pack := bytes.NewBuffer(nil) pack.Write([]byte{PROXY_PACKAGE_HEAD}) @@ -30,8 +31,19 @@ func PackageProxy(control byte, data []byte) []byte { } // 跟代理解析端拆包 -func UnpackageProxy(pack []byte) (ProxyPackage) { - return ProxyPackage{pack[0], pack[1], pack[1:]} +func UnpackageProxy(buffer []byte) (pack ProxyPackage, err error) { + + if len(buffer) < 2 { + err = errors.New("格式不对,长度小于2") + return + } + + if buffer[0] != PROXY_PACKAGE_HEAD { + err = errors.New("包头不对") + return + } + pack = ProxyPackage{buffer[0], buffer[1], buffer[2:]} + return } // Customize P2P data package @@ -42,8 +54,7 @@ func PackageNat(control byte, sessionID string, data []byte) []byte { pack.Write([]byte(PAIR_PACKAGE_HEAD)) // Head [6]byte pack.Write([]byte{control}) pack.Write([]byte(sessionID)) - pack.Write(IntToBytes(len(data))) // length of sent data - pack.Write([]byte(sessionID)) + pack.Write(IntToBytes(len(data))) // length of sent data pack.Write(data) return pack.Bytes() @@ -54,40 +65,43 @@ func PackageNat(control byte, sessionID string, data []byte) []byte { func UnpackageNat(buffer []byte, readChan chan PairPackage) (data []byte) { length := len(buffer) - log.Println("长度为:", length) + var i int for i = 0; i < length; i = i + 1 { - if length < i + PAIR_PACKAGE_PREFIX_LENGTH { + if length < i+PAIR_PACKAGE_PREFIX_LENGTH { break } - if string(buffer[i:i+ PAIR_PACKAGE_HEAD_LENGTH]) == string(PAIR_PACKAGE_HEAD) { + if string(buffer[i:i+PAIR_PACKAGE_HEAD_LENGTH]) == PAIR_PACKAGE_HEAD { + // Length of data - dataLength := BytesToInt(buffer[i+ PAIR_PACKAGE_PREFIX_LENGTH - PAIR_PACKAGE_DATA_LENGTH: i + + dataLength := BytesToInt(buffer[i+PAIR_PACKAGE_PREFIX_LENGTH-PAIR_PACKAGE_DATA_LENGTH : i+ PAIR_PACKAGE_PREFIX_LENGTH]) - if length < i+ PAIR_PACKAGE_PREFIX_LENGTH + dataLength { + if length < i+PAIR_PACKAGE_PREFIX_LENGTH+dataLength { break } // data - data := buffer[i + PAIR_PACKAGE_PREFIX_LENGTH: i + PAIR_PACKAGE_PREFIX_LENGTH + + data := buffer[i+PAIR_PACKAGE_PREFIX_LENGTH : i+PAIR_PACKAGE_PREFIX_LENGTH+ dataLength] - controlID := buffer[i + PAIR_PACKAGE_HEAD_LENGTH : i + PAIR_PACKAGE_HEAD_LENGTH + + controlID := buffer[i+PAIR_PACKAGE_HEAD_LENGTH : i+PAIR_PACKAGE_HEAD_LENGTH+ PAIR_PACKAGE_CONTROL_LENGTH] - sessionID := string(buffer[i+ PAIR_PACKAGE_HEAD_LENGTH + PAIR_PACKAGE_CONTROL_LENGTH: - i + PAIR_PACKAGE_HEAD_LENGTH + PAIR_PACKAGE_CONTROL_LENGTH + PAIR_PACKAGE_SESSIONID_LENGTH]) - pact := PairPackage{ - Head: PAIR_PACKAGE_HEAD, + iSessionIDStartPos := PAIR_PACKAGE_HEAD_LENGTH + PAIR_PACKAGE_CONTROL_LENGTH + sessionID := string(buffer[i+iSessionIDStartPos : i+iSessionIDStartPos+PAIR_PACKAGE_SESSIONID_LENGTH]) + + pack := PairPackage{ + Head: PAIR_PACKAGE_HEAD, Data: data, ControlID: controlID[0], - SessionID: [4]byte(sessionID), + SessionID: sessionID, } - readChan <- pact - i += PAIR_PACKAGE_PREFIX_LENGTH + dataLength - 1 + readChan <- pack + + i += PAIR_PACKAGE_PREFIX_LENGTH + dataLength - 1 } } diff --git a/src/punching/util/rand.go b/src/punching/util/rand.go index 829359d..77b744e 100644 --- a/src/punching/util/rand.go +++ b/src/punching/util/rand.go @@ -5,13 +5,12 @@ import ( "time" ) - // GenerateRandomPairKey 获取4位随机匹配码 func GenerateRandomPairKey() string { //97~122 小写字母 rndNums := GenerateRandomNumber(97, 122, 4) key := "" - for num, _ := range rndNums { + for _, num := range rndNums { key = key + string(byte(num)) } return key @@ -30,7 +29,7 @@ func GenerateRandomNumber(start int, end int, count int) []int { r := rand.New(rand.NewSource(time.Now().UnixNano())) for len(nums) < count { //生成随机数 - num := r.Intn((end - start)) + start + num := r.Intn(end-start) + start //查重 exist := false for _, v := range nums { diff --git a/src/punching/util/rand_test.go b/src/punching/util/rand_test.go new file mode 100644 index 0000000..0adb777 --- /dev/null +++ b/src/punching/util/rand_test.go @@ -0,0 +1,23 @@ +package util_test + +import ( + "fmt" + "math/rand" + "punching/util" + "testing" + "time" +) + +func TestGenerateRandomPairKey(t *testing.T) { + t1 := util.GenerateRandomPairKey() + r := rand.New(rand.NewSource(time.Now().UnixNano())) + num := r.Intn(122-97) + 97 + + t.Log(string(byte(num))) + + fmt.Println("fff") + fmt.Println(t1) + if len(t1) != 4 { + t.Errorf("长度不对,%s", t1) + } +}