feat: support network: h2 for masque outbound

This commit is contained in:
wwqgtxx
2026-04-10 10:35:19 +08:00
parent ab3746a1d8
commit 885244a9a0
4 changed files with 441 additions and 40 deletions
+77 -39
View File
@@ -7,6 +7,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"io"
"net"
"net/netip"
"strconv"
@@ -16,6 +17,7 @@ import (
"github.com/metacubex/mihomo/common/atomic"
"github.com/metacubex/mihomo/common/contextutils"
"github.com/metacubex/mihomo/common/httputils"
"github.com/metacubex/mihomo/common/pool"
"github.com/metacubex/mihomo/component/dialer"
"github.com/metacubex/mihomo/component/resolver"
@@ -25,7 +27,7 @@ import (
"github.com/metacubex/mihomo/transport/masque"
"github.com/metacubex/mihomo/transport/tuic/common"
connectip "github.com/metacubex/connect-ip-go"
"github.com/metacubex/http"
"github.com/metacubex/quic-go"
wireguard "github.com/metacubex/sing-wireguard"
M "github.com/metacubex/sing/common/metadata"
@@ -34,11 +36,12 @@ import (
type Masque struct {
*Base
tlsConfig *tls.Config
quicConfig *quic.Config
tunDevice wireguard.Device
resolver resolver.Resolver
uri string
tlsConfig *tls.Config
quicConfig *quic.Config
tunDevice wireguard.Device
resolver resolver.Resolver
uri string
h2Transport *http.Http2Transport
runCtx context.Context
runCancel context.CancelFunc
@@ -51,17 +54,19 @@ type Masque struct {
type MasqueOption struct {
BasicOption
Name string `proxy:"name"`
Server string `proxy:"server"`
Port int `proxy:"port"`
PrivateKey string `proxy:"private-key"`
PublicKey string `proxy:"public-key"`
Ip string `proxy:"ip,omitempty"`
Ipv6 string `proxy:"ipv6,omitempty"`
URI string `proxy:"uri,omitempty"`
SNI string `proxy:"sni,omitempty"`
MTU int `proxy:"mtu,omitempty"`
UDP bool `proxy:"udp,omitempty"`
Name string `proxy:"name"`
Server string `proxy:"server"`
Port int `proxy:"port"`
PrivateKey string `proxy:"private-key"`
PublicKey string `proxy:"public-key"`
Ip string `proxy:"ip,omitempty"`
Ipv6 string `proxy:"ipv6,omitempty"`
URI string `proxy:"uri,omitempty"`
SNI string `proxy:"sni,omitempty"`
MTU int `proxy:"mtu,omitempty"`
UDP bool `proxy:"udp,omitempty"`
SkipCertVerify bool `proxy:"skip-cert-verify,omitempty"`
Network string `proxy:"network,omitempty"`
CongestionController string `proxy:"congestion-controller,omitempty"`
CWND int `proxy:"cwnd,omitempty"`
@@ -150,12 +155,25 @@ func NewMasque(option MasqueOption) (*Masque, error) {
sni = masque.ConnectSNI
}
tlsConfig, err := masque.PrepareTlsConfig(privKey, ecPubKey, sni)
tlsConfig, err := masque.PrepareTlsConfig(privKey, ecPubKey, sni, option.SkipCertVerify)
if err != nil {
return nil, fmt.Errorf("failed to prepare TLS config: %v\n", err)
}
outbound.tlsConfig = tlsConfig
if option.Network == "h2" {
tlsConfig.NextProtos = []string{"h2"}
outbound.h2Transport = &http.Http2Transport{
DialTLSContext: func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error) {
c, err := outbound.dialer.DialContext(ctx, "tcp", outbound.addr)
if err != nil {
return nil, err
}
return tls.Client(c, tlsConfig), nil
},
}
}
outbound.quicConfig = &quic.Config{
EnableDatagrams: true,
InitialPacketSize: 1242,
@@ -229,27 +247,40 @@ func (w *Masque) run(ctx context.Context) error {
w.runDevice.Store(true)
}
udpAddr, err := resolveUDPAddr(ctx, "udp", w.addr, w.prefer)
if err != nil {
return err
}
var pc net.PacketConn
var tr io.Closer
var ipConn masque.IpConn
var err error
if w.h2Transport != nil {
ipConn, err = masque.ConnectTunnelH2(ctx, &http.Client{Transport: w.h2Transport}, w.uri)
if err != nil {
return err
}
} else {
var udpAddr *net.UDPAddr
udpAddr, err = resolveUDPAddr(ctx, "udp", w.addr, w.prefer)
if err != nil {
return err
}
pc, err := w.dialer.ListenPacket(ctx, "udp", "", udpAddr.AddrPort())
if err != nil {
return err
}
pc, err = w.dialer.ListenPacket(ctx, "udp", "", udpAddr.AddrPort())
if err != nil {
return err
}
quicConn, err := quic.Dial(ctx, pc, udpAddr, w.tlsConfig, w.quicConfig)
if err != nil {
return err
}
var quicConn *quic.Conn
quicConn, err = quic.Dial(ctx, pc, udpAddr, w.tlsConfig, w.quicConfig)
if err != nil {
return err
}
common.SetCongestionController(quicConn, w.option.CongestionController, w.option.CWND)
common.SetCongestionController(quicConn, w.option.CongestionController, w.option.CWND)
tr, ipConn, err := masque.ConnectTunnel(ctx, quicConn, w.uri)
if err != nil {
_ = pc.Close()
return err
tr, ipConn, err = masque.ConnectTunnel(ctx, quicConn, w.uri)
if err != nil {
_ = pc.Close()
return err
}
}
w.running.Store(true)
@@ -258,8 +289,12 @@ func (w *Masque) run(ctx context.Context) error {
contextutils.AfterFunc(runCtx, func() {
w.running.Store(false)
_ = ipConn.Close()
_ = tr.Close()
_ = pc.Close()
if tr != nil {
_ = tr.Close()
}
if pc != nil {
_ = pc.Close()
}
})
go func() {
@@ -276,7 +311,7 @@ func (w *Masque) run(ctx context.Context) error {
}
icmp, err := ipConn.WritePacket(buf[:sizes[0]])
if err != nil {
if errors.As(err, new(*connectip.CloseError)) {
if errors.Is(err, net.ErrClosed) {
log.Errorln("[Masque](%s) connection closed while writing to IP connection: %v", w.name, err)
return
}
@@ -299,7 +334,7 @@ func (w *Masque) run(ctx context.Context) error {
for runCtx.Err() == nil {
n, err := ipConn.ReadPacket(buf)
if err != nil {
if errors.As(err, new(*connectip.CloseError)) {
if errors.Is(err, net.ErrClosed) {
log.Errorln("[Masque](%s) connection closed while writing to IP connection: %v", w.name, err)
return
}
@@ -322,6 +357,9 @@ func (w *Masque) Close() error {
if w.tunDevice != nil {
w.tunDevice.Close()
}
if w.h2Transport != nil {
httputils.CloseTransport(w.h2Transport)
}
return nil
}
+17
View File
@@ -1080,6 +1080,23 @@ proxies: # socks5
# dns: [ 1.1.1.1, 8.8.8.8 ] # 仅在 remote-dns-resolve 为 true 时生效
# congestion-controller: bbr # 默认不开启
# masque-h2
- name: "masque-h2"
type: masque
server: 162.159.198.2
port: 443
private-key: MHcCAQEEILI1eOtnbEIh89Fj4yNDuFR6UjayCKI3NdLl3DhetimWoAoGCCqGSM49AwEHoUQDQgAEgyXrE8v+hHsHy3ewSb3WcRjYgCrM9T9hiE0Uv6k2DZ1+4kefrDT9v1Q/8wdRigTf6t6gGNUV8W+IUMdrfUt+9g==
public-key: MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIaU7MToJm9NKp8YfGxR6r+/h4mcG7SxI8tsW8OR1A5tv/zCzVbCRRh2t87/kxnP6lAy0lkr7qYwu+ox+k3dr6w==
ip: 172.16.0.2
ipv6: 2606:4700:110:84c0:163a:4914:a0ad:3342
mtu: 1280
udp: true
network: h2
# 一个出站代理的标识。当值不为空时,将使用指定的 proxy 发出连接
# dialer-proxy: "ss1"
# remote-dns-resolve: true # 强制 dns 远程解析,默认值为 false
# dns: [ 1.1.1.1, 8.8.8.8 ] # 仅在 remote-dns-resolve 为 true 时生效
# tuic
- name: tuic
server: www.example.com
+337
View File
@@ -0,0 +1,337 @@
package masque
// copy and modify from: https://github.com/Diniboy1123/connect-ip-go/blob/8d7bb0a858a2674046a7cb5538749e4c826c3538/client_h2.go
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"net/url"
"strings"
"sync"
"github.com/metacubex/mihomo/log"
"github.com/metacubex/http"
"github.com/metacubex/quic-go/quicvarint"
"github.com/yosida95/uritemplate/v3"
)
const h2DatagramCapsuleType uint64 = 0
const (
ipv4HeaderLen = 20
ipv6HeaderLen = 40
)
func ConnectTunnelH2(ctx context.Context, h2Client *http.Client, connectUri string) (IpConn, error) {
additionalHeaders := http.Header{
"User-Agent": []string{""},
}
template := uritemplate.MustNew(connectUri)
h2Headers := additionalHeaders.Clone()
h2Headers.Set("cf-connect-proto", "cf-connect-ip")
// TODO: support PQC
h2Headers.Set("pq-enabled", "false")
ipConn, rsp, err := dialH2(ctx, h2Client, template, h2Headers)
if err != nil {
if strings.Contains(err.Error(), "tls: access denied") {
return nil, errors.New("login failed! Please double-check if your tls key and cert is enrolled in the Cloudflare Access service")
}
return nil, fmt.Errorf("failed to dial connect-ip over HTTP/2: %w", err)
}
if rsp.StatusCode != http.StatusOK {
_ = ipConn.Close()
return nil, fmt.Errorf("failed to dial connect-ip: %v", rsp.Status)
}
return ipConn, nil
}
// dialH2 dials a proxied connection over HTTP/2 CONNECT-IP.
//
// This transport carries proxied packets inside HTTP capsule DATAGRAM frames.
func dialH2(ctx context.Context, client *http.Client, template *uritemplate.Template, additionalHeaders http.Header) (*h2IpConn, *http.Response, error) {
if len(template.Varnames()) > 0 {
return nil, nil, errors.New("connect-ip: IP flow forwarding not supported")
}
u, err := url.Parse(template.Raw())
if err != nil {
return nil, nil, fmt.Errorf("connect-ip: failed to parse URI: %w", err)
}
pr, pw := io.Pipe()
req, err := http.NewRequestWithContext(ctx, http.MethodConnect, u.String(), pr)
if err != nil {
_ = pr.Close()
_ = pw.Close()
return nil, nil, fmt.Errorf("connect-ip: failed to create request: %w", err)
}
req.Host = authorityFromURL(u)
req.ContentLength = -1
req.Header = make(http.Header)
for k, v := range additionalHeaders {
req.Header[k] = v
}
rsp, err := client.Do(req)
if err != nil {
_ = pr.Close()
_ = pw.Close()
return nil, nil, fmt.Errorf("connect-ip: failed to send request: %w", err)
}
if rsp.StatusCode < 200 || rsp.StatusCode > 299 {
_ = pr.Close()
_ = pw.Close()
_ = rsp.Body.Close()
return nil, rsp, fmt.Errorf("connect-ip: server responded with %d", rsp.StatusCode)
}
stream := &h2DatagramStream{
requestBody: pw,
responseBody: rsp.Body,
recvBuf: make([]byte, 0, 4096),
}
return &h2IpConn{
str: stream,
closeChan: make(chan struct{}),
}, rsp, nil
}
func authorityFromURL(u *url.URL) string {
if u.Port() != "" {
return u.Host
}
host := u.Hostname()
if host == "" {
return u.Host
}
return host + ":443"
}
type h2IpConn struct {
str *h2DatagramStream
mu sync.Mutex
closeChan chan struct{}
closeErr error
}
func (c *h2IpConn) ReadPacket(b []byte) (n int, err error) {
start:
data, err := c.str.ReceiveDatagram(context.Background())
if err != nil {
select {
case <-c.closeChan:
return 0, c.closeErr
default:
return 0, err
}
}
if err := c.handleIncomingProxiedPacket(data); err != nil {
log.Debugln("dropping proxied packet: %s", err)
goto start
}
return copy(b, data), nil
}
func (c *h2IpConn) handleIncomingProxiedPacket(data []byte) error {
if len(data) == 0 {
return errors.New("connect-ip: empty packet")
}
switch v := ipVersion(data); v {
default:
return fmt.Errorf("connect-ip: unknown IP versions: %d", v)
case 4:
if len(data) < ipv4HeaderLen {
return fmt.Errorf("connect-ip: malformed datagram: too short")
}
case 6:
if len(data) < ipv6HeaderLen {
return fmt.Errorf("connect-ip: malformed datagram: too short")
}
}
return nil
}
// WritePacket writes an IP packet to the stream.
// If sending the packet fails, it might return an ICMP packet.
// It is the caller's responsibility to send the ICMP packet to the sender.
func (c *h2IpConn) WritePacket(b []byte) (icmp []byte, err error) {
data, err := c.composeDatagram(b)
if err != nil {
log.Debugln("dropping proxied packet (%d bytes) that can't be proxied: %s", len(b), err)
return nil, nil
}
if err := c.str.SendDatagram(data); err != nil {
select {
case <-c.closeChan:
return nil, c.closeErr
default:
return nil, err
}
}
return nil, nil
}
func (c *h2IpConn) composeDatagram(b []byte) ([]byte, error) {
// TODO: implement src, dst and ipproto checks
if len(b) == 0 {
return nil, nil
}
switch v := ipVersion(b); v {
default:
return nil, fmt.Errorf("connect-ip: unknown IP versions: %d", v)
case 4:
if len(b) < ipv4HeaderLen {
return nil, fmt.Errorf("connect-ip: IPv4 packet too short")
}
ttl := b[8]
if ttl <= 1 {
return nil, fmt.Errorf("connect-ip: datagram TTL too small: %d", ttl)
}
b[8]-- // decrement TTL
// recalculate the checksum
binary.BigEndian.PutUint16(b[10:12], calculateIPv4Checksum(([ipv4HeaderLen]byte)(b[:ipv4HeaderLen])))
case 6:
if len(b) < ipv6HeaderLen {
return nil, fmt.Errorf("connect-ip: IPv6 packet too short")
}
hopLimit := b[7]
if hopLimit <= 1 {
return nil, fmt.Errorf("connect-ip: datagram Hop Limit too small: %d", hopLimit)
}
b[7]-- // Decrement Hop Limit
}
return b, nil
}
func (c *h2IpConn) Close() error {
c.mu.Lock()
if c.closeErr == nil {
c.closeErr = net.ErrClosed
close(c.closeChan)
}
c.mu.Unlock()
err := c.str.Close()
return err
}
func ipVersion(b []byte) uint8 { return b[0] >> 4 }
func calculateIPv4Checksum(header [ipv4HeaderLen]byte) uint16 {
// add every 16-bit word in the header, skipping the checksum field (bytes 10 and 11)
var sum uint32
for i := 0; i < len(header); i += 2 {
if i == 10 {
continue // skip checksum field
}
sum += uint32(binary.BigEndian.Uint16(header[i : i+2]))
}
for (sum >> 16) > 0 {
sum = (sum & 0xffff) + (sum >> 16)
}
return ^uint16(sum)
}
type h2DatagramStream struct {
requestBody *io.PipeWriter
responseBody io.ReadCloser
readMu sync.Mutex
writeMu sync.Mutex
recvBuf []byte
}
func (s *h2DatagramStream) ReceiveDatagram(_ context.Context) ([]byte, error) {
s.readMu.Lock()
defer s.readMu.Unlock()
for {
capsuleType, payload, consumed, ok, err := parseCapsule(s.recvBuf)
if err != nil {
return nil, err
}
if ok {
s.recvBuf = s.recvBuf[consumed:]
if capsuleType != h2DatagramCapsuleType {
continue
}
return payload, nil
}
buf := make([]byte, 4096)
n, readErr := s.responseBody.Read(buf)
if n > 0 {
s.recvBuf = append(s.recvBuf, buf[:n]...)
continue
}
if readErr != nil {
return nil, readErr
}
}
}
func (s *h2DatagramStream) SendDatagram(data []byte) error {
frame := make([]byte, 0, 2*quicvarint.Len(0)+len(data))
frame = quicvarint.Append(frame, h2DatagramCapsuleType)
frame = quicvarint.Append(frame, uint64(len(data)))
frame = append(frame, data...)
s.writeMu.Lock()
defer s.writeMu.Unlock()
_, err := s.requestBody.Write(frame)
if err != nil {
return fmt.Errorf("connect-ip: failed to send datagram capsule: %w", err)
}
return nil
}
func (s *h2DatagramStream) Close() error {
_ = s.requestBody.Close()
return s.responseBody.Close()
}
func parseCapsule(buf []byte) (capsuleType uint64, payload []byte, consumed int, ok bool, err error) {
capsuleType, typeLen, ok := parseVarint(buf)
if !ok {
return 0, nil, 0, false, nil
}
payloadLen, payloadLenLen, ok := parseVarint(buf[typeLen:])
if !ok {
return 0, nil, 0, false, nil
}
headerLen := typeLen + payloadLenLen
totalLen := headerLen + int(payloadLen)
if totalLen < headerLen {
return 0, nil, 0, false, errors.New("connect-ip: malformed capsule length")
}
if len(buf) < totalLen {
return 0, nil, 0, false, nil
}
return capsuleType, buf[headerLen:totalLen], totalLen, true, nil
}
func parseVarint(buf []byte) (v uint64, n int, ok bool) {
if len(buf) == 0 {
return 0, 0, false
}
prefix := buf[0] >> 6
n = 1 << prefix
if len(buf) < n {
return 0, 0, false
}
v = uint64(buf[0] & 0x3f)
for i := 1; i < n; i++ {
v = (v << 8) | uint64(buf[i])
}
return v, n, true
}
+10 -1
View File
@@ -27,9 +27,15 @@ const (
ConnectURI = "https://cloudflareaccess.com"
)
type IpConn interface {
ReadPacket(b []byte) (n int, err error)
WritePacket(b []byte) (icmp []byte, err error)
Close() error
}
// PrepareTlsConfig creates a TLS configuration using the provided certificate and SNI (Server Name Indication).
// It also verifies the peer's public key against the provided public key.
func PrepareTlsConfig(privKey *ecdsa.PrivateKey, peerPubKey *ecdsa.PublicKey, sni string) (*tls.Config, error) {
func PrepareTlsConfig(privKey *ecdsa.PrivateKey, peerPubKey *ecdsa.PublicKey, sni string, insecure bool) (*tls.Config, error) {
verfiyCert := func(cert *x509.Certificate) error {
if _, ok := cert.PublicKey.(*ecdsa.PublicKey); !ok {
// we only support ECDSA
@@ -77,6 +83,9 @@ func PrepareTlsConfig(privKey *ecdsa.PrivateKey, peerPubKey *ecdsa.PublicKey, sn
return err
},
}
if insecure {
tlsConfig.VerifyConnection = nil
}
return tlsConfig, nil
}