fix: Improve error handling to avoid fd leaks and zombies

This commit is contained in:
Mike Wang
2025-10-08 04:15:22 +08:00
parent 7dffeecdc7
commit 85dbbe01d2
+242 -208
View File
@@ -252,14 +252,14 @@ class StunClient(object):
socket_type = socket.SOCK_DGRAM if self.udp else socket.SOCK_STREAM
stun_host, stun_port = self.stun_server_list[0]
sock = socket.socket(socket.AF_INET, socket_type)
socket_set_opt(
sock,
reuse = True,
bind_addr = (self.source_host, self.source_port),
interface = self.interface,
timeout = 3
)
try:
socket_set_opt(
sock,
reuse = True,
bind_addr = (self.source_host, self.source_port),
interface = self.interface,
timeout = 3
)
sock.connect((stun_host, stun_port))
inner_addr = sock.getsockname()
self.source_host, self.source_port = inner_addr
@@ -311,23 +311,27 @@ class KeepAlive(object):
def _connect(self):
sock_type = socket.SOCK_DGRAM if self.udp else socket.SOCK_STREAM
sock = socket.socket(socket.AF_INET, sock_type)
socket_set_opt(
sock,
reuse = True,
bind_addr = (self.source_host, self.source_port),
interface = self.interface,
timeout = 3
)
sock.connect((self.host, self.port))
if not self.udp:
Logger.debug("keep-alive: Connected to host %s" % (
addr_to_uri((self.host, self.port), udp=self.udp)
))
if self.reconn:
Logger.info("keep-alive: connection restored")
self.reconn = False
self.sock = sock
self.sock = socket.socket(socket.AF_INET, sock_type)
try:
socket_set_opt(
self.sock,
reuse = True,
bind_addr = (self.source_host, self.source_port),
interface = self.interface,
timeout = 3
)
self.sock.connect((self.host, self.port))
if not self.udp:
Logger.debug("keep-alive: Connected to host %s" % (
addr_to_uri((self.host, self.port), udp=self.udp)
))
if self.reconn:
Logger.info("keep-alive: connection restored")
self.reconn = False
except Exception:
self.sock.close()
self.sock = None
raise
def keep_alive(self):
if self.sock is None:
@@ -338,7 +342,7 @@ class KeepAlive(object):
self._keep_alive_tcp()
Logger.debug("keep-alive: OK")
def reset(self):
def disconnect(self):
if self.sock is not None:
self.sock.close()
self.sock = None
@@ -383,7 +387,7 @@ class KeepAlive(object):
raise ex
# fix: Keep-alive cause STUN socket timeout on Windows
if sys.platform == "win32":
self.reset()
self.disconnect()
return
@@ -398,35 +402,41 @@ class ForwardNone(object):
class ForwardTestServer(object):
def __init__(self):
self.active = False
self.sock = None
self.sock_type = None
self.buff_size = 8192
self.timeout = 3
def __del__(self):
self.stop_forward()
# Start a socket server for testing purpose
# target address is ignored
def start_forward(self, ip, port, toip, toport, udp=False):
self.sock_type = socket.SOCK_DGRAM if udp else socket.SOCK_STREAM
self.sock = socket.socket(socket.AF_INET, self.sock_type)
socket_set_opt(
self.sock,
reuse = True,
bind_addr = ("", port)
)
Logger.debug("fwd-test: Starting test server at %s" % addr_to_uri((ip, port), udp=udp))
if udp:
th = start_daemon_thread(self._test_server_run_udp)
else:
th = start_daemon_thread(self._test_server_run_http)
time.sleep(1)
if not th.is_alive():
raise OSError("Test server thread exited too quickly")
self.active = True
sock_type = socket.SOCK_DGRAM if udp else socket.SOCK_STREAM
self.sock = socket.socket(socket.AF_INET, sock_type)
try:
socket_set_opt(
self.sock,
reuse = True,
bind_addr = ("", port)
)
Logger.debug("fwd-test: Starting test server at %s" %
addr_to_uri((ip, port), udp=udp))
if udp:
th = start_daemon_thread(self._test_server_run_udp)
else:
th = start_daemon_thread(self._test_server_run_http)
time.sleep(1)
if not th.is_alive():
raise OSError("Test server thread exited too quickly")
except Exception:
self.sock.close()
self.sock = None
raise
def _test_server_run_http(self):
self.sock.listen(5)
while self.sock.fileno() != -1:
while self.sock and self.sock.fileno() != -1:
try:
conn, addr = self.sock.accept()
Logger.debug("fwd-test: got client %s" % (addr,))
@@ -454,7 +464,7 @@ class ForwardTestServer(object):
conn.close()
def _test_server_run_udp(self):
while self.sock.fileno() != -1:
while self.sock and self.sock.fileno() != -1:
try:
msg, addr = self.sock.recvfrom(self.buff_size)
Logger.debug("fwd-test: got client %s" % (addr,))
@@ -463,15 +473,15 @@ class ForwardTestServer(object):
return
def stop_forward(self):
Logger.debug("fwd-test: Stopping test server")
self.sock.close()
self.active = False
if self.sock:
Logger.debug("fwd-test: Stopping test server")
self.sock.close()
self.sock = None
class ForwardIptables(object):
def __init__(self, snat=False, sudo=False):
self.rules = []
self.active = False
self.min_ver = (1, 4, 1)
self.curr_ver = (0, 0, 0)
self.snat = snat
@@ -486,11 +496,9 @@ class ForwardIptables(object):
if self.curr_ver >= (1, 4, 20):
self.iptables_cmd += ["-w"]
self._iptables_init()
self._iptables_clean()
def __del__(self):
if self.active:
self.stop_forward()
self.stop_forward()
def _iptables_check(self):
if os.name != "posix":
@@ -548,7 +556,6 @@ class ForwardIptables(object):
)
def _iptables_clean(self):
Logger.debug("fwd-iptables: Cleaning up Natter rules")
while self.rules:
rule = self.rules.pop()
rule_rm = ["-D" if arg in ("-I", "-A") else arg for arg in rule]
@@ -557,7 +564,6 @@ class ForwardIptables(object):
self.iptables_cmd + rule_rm,
stderr=subprocess.STDOUT
)
return
except subprocess.CalledProcessError as ex:
Logger.error("fwd-iptables: Failed to execute %s: %s" % (ex.cmd, ex.output))
continue
@@ -571,43 +577,49 @@ class ForwardIptables(object):
Logger.debug("fwd-iptables: Adding rule %s forward to %s" % (
addr_to_uri((ip, port), udp=udp), addr_to_uri((toip, toport), udp=udp)
))
rule = [
"-t", "nat",
"-I", "NATTER",
"-p", proto,
"--dst", ip,
"--dport", "%d" % port,
"-j", "DNAT",
"--to-destination", "%s:%d" % (toip, toport)
]
subprocess.check_output(self.iptables_cmd + rule)
self.rules.append(rule)
if self.snat:
try:
rule = [
"-t", "nat",
"-I", "NATTER_SNAT",
"-I", "NATTER",
"-p", proto,
"--dst", toip,
"--dport", "%d" % toport,
"-j", "SNAT",
"--to-source", ip
"--dst", ip,
"--dport", "%d" % port,
"-j", "DNAT",
"--to-destination", "%s:%d" % (toip, toport)
]
subprocess.check_output(self.iptables_cmd + rule)
self.rules.append(rule)
self.active = True
if self.snat:
rule = [
"-t", "nat",
"-I", "NATTER_SNAT",
"-p", proto,
"--dst", toip,
"--dport", "%d" % toport,
"-j", "SNAT",
"--to-source", ip
]
subprocess.check_output(self.iptables_cmd + rule)
self.rules.append(rule)
except Exception:
try:
self._iptables_clean()
except Exception:
pass
raise
def stop_forward(self):
Logger.debug("fwd-iptables: Cleaning up Natter rules")
self._iptables_clean()
self.active = False
def _check_sys_forward_config(self):
fpath = "/proc/sys/net/ipv4/ip_forward"
if os.path.exists(fpath):
fin = open(fpath, "r")
buff = fin.read()
fin.close()
with open(fpath, "r") as fin:
buff = fin.read()
if buff.strip() != "1":
raise OSError("IP forwarding is not allowed. Please do `sysctl net.ipv4.ip_forward=1`")
raise OSError("IP forwarding is not allowed. "
"Please do `sysctl net.ipv4.ip_forward=1`")
else:
Logger.warning("fwd-iptables: '%s' not found" % str(fpath))
@@ -631,7 +643,6 @@ class ForwardNftables(object):
def __init__(self, snat=False, sudo=False):
self.handle = -1
self.handle_snat = -1
self.active = False
self.min_ver = (0, 9, 0)
self.snat = snat
self.sudo = sudo
@@ -642,11 +653,9 @@ class ForwardNftables(object):
if not self._nftables_check():
raise OSError("nftables >= %s not available" % str(self.min_ver))
self._nftables_init()
self._nftables_clean()
def __del__(self):
if self.active:
self.stop_forward()
self.stop_forward()
def _nftables_check(self):
if os.name != "posix":
@@ -744,11 +753,9 @@ class ForwardNftables(object):
if not m:
raise ValueError("Unknown nftables handle")
self.handle_snat = int(m.group(1))
self.active = True
def stop_forward(self):
self._nftables_clean()
self.active = False
def _check_sys_forward_config(self):
fpath = "/proc/sys/net/ipv4/ip_forward"
@@ -779,7 +786,6 @@ class ForwardSudoNftablesSnat(ForwardNftables):
class ForwardGost(object):
def __init__(self):
self.active = False
self.min_ver = (2, 3)
self.proc = None
self.udp_timeout = 60
@@ -787,8 +793,7 @@ class ForwardGost(object):
raise OSError("gost >= %s not available" % str(self.min_ver))
def __del__(self):
if self.active:
self.stop_forward()
self.stop_forward()
def _gost_check(self):
try:
@@ -806,31 +811,38 @@ class ForwardGost(object):
def start_forward(self, ip, port, toip, toport, udp=False):
if (ip, port) == (toip, toport):
raise ValueError("Cannot forward to the same address %s" % addr_to_str((ip, port)))
raise ValueError("Cannot forward to the same address %s" %
addr_to_str((ip, port)))
proto = "udp" if udp else "tcp"
Logger.debug("fwd-gost: Starting gost %s forward to %s" % (
addr_to_uri((ip, port), udp=udp), addr_to_uri((toip, toport), udp=udp)
addr_to_uri((ip, port), udp=udp),
addr_to_uri((toip, toport), udp=udp)
))
gost_arg = "-L=%s://:%d/%s:%d" % (proto, port, toip, toport)
if udp:
gost_arg += "?ttl=%ds" % self.udp_timeout
self.proc = subprocess.Popen(["gost", gost_arg])
time.sleep(1)
if self.proc.poll() is not None:
raise OSError("gost exited too quickly")
self.active = True
try:
time.sleep(1)
if self.proc.poll() is not None:
raise OSError("gost exited too quickly")
except Exception:
self.proc.kill()
self.proc.wait()
self.proc = None
raise
def stop_forward(self):
Logger.debug("fwd-gost: Stopping gost")
if self.proc and self.proc.returncode is not None:
if self.proc and self.proc.poll() is not None:
return
self.proc.terminate()
self.active = False
self.proc.wait()
self.proc = None
class ForwardSocat(object):
def __init__(self):
self.active = False
self.min_ver = (1, 7, 2)
self.proc = None
self.udp_timeout = 60
@@ -839,8 +851,7 @@ class ForwardSocat(object):
raise OSError("socat >= %s not available" % str(self.min_ver))
def __del__(self):
if self.active:
self.stop_forward()
self.stop_forward()
def _socat_check(self):
try:
@@ -871,22 +882,27 @@ class ForwardSocat(object):
"%s4-LISTEN:%d,reuseaddr,fork,max-children=%d" % (proto, port, self.max_children),
"%s4:%s:%d" % (proto, toip, toport)
])
time.sleep(1)
if self.proc.poll() is not None:
raise OSError("socat exited too quickly")
self.active = True
try:
time.sleep(1)
if self.proc.poll() is not None:
raise OSError("socat exited too quickly")
except Exception:
self.proc.kill()
self.proc.wait()
self.proc = None
raise
def stop_forward(self):
Logger.debug("fwd-socat: Stopping socat")
if self.proc and self.proc.returncode is not None:
if self.proc and self.proc.poll() is not None:
return
self.proc.terminate()
self.active = False
self.proc.wait()
self.proc = None
class ForwardSocket(object):
def __init__(self):
self.active = False
self.sock = None
self.sock_type = None
self.outbound_addr = None
@@ -895,31 +911,37 @@ class ForwardSocket(object):
self.max_threads = 128
def __del__(self):
if self.active:
self.stop_forward()
self.stop_forward()
def start_forward(self, ip, port, toip, toport, udp=False):
if (ip, port) == (toip, toport):
raise ValueError("Cannot forward to the same address %s" % addr_to_str((ip, port)))
raise ValueError("Cannot forward to the same address %s" %
addr_to_str((ip, port)))
self.sock_type = socket.SOCK_DGRAM if udp else socket.SOCK_STREAM
self.sock = socket.socket(socket.AF_INET, self.sock_type)
socket_set_opt(
self.sock,
reuse = True,
bind_addr = ("", port)
)
self.outbound_addr = toip, toport
Logger.debug("fwd-socket: Starting socket %s forward to %s" % (
addr_to_uri((ip, port), udp=udp), addr_to_uri((toip, toport), udp=udp)
))
if udp:
th = start_daemon_thread(self._socket_udp_recvfrom)
else:
th = start_daemon_thread(self._socket_tcp_listen)
time.sleep(1)
if not th.is_alive():
raise OSError("Socket thread exited too quickly")
self.active = True
try:
socket_set_opt(
self.sock,
reuse = True,
bind_addr = ("", port)
)
self.outbound_addr = toip, toport
Logger.debug("fwd-socket: Starting socket %s forward to %s" % (
addr_to_uri((ip, port), udp=udp),
addr_to_uri((toip, toport), udp=udp)
))
if udp:
th = start_daemon_thread(self._socket_udp_recvfrom)
else:
th = start_daemon_thread(self._socket_tcp_listen)
time.sleep(1)
if not th.is_alive():
raise OSError("Socket thread exited too quickly")
except Exception:
self.sock.close()
self.sock = None
self.sock_type = None
raise
def _socket_tcp_listen(self):
self.sock.listen(5)
@@ -1006,9 +1028,11 @@ class ForwardSocket(object):
return
def stop_forward(self):
Logger.debug("fwd-socket: Stopping socket")
self.sock.close()
self.active = False
if self.sock and self.sock.fileno() != -1:
Logger.debug("fwd-socket: Stopping socket")
self.sock.close()
self.sock = None
self.sock_type = None
class UPnPService(object):
@@ -1083,21 +1107,23 @@ class UPnPService(object):
"%s\r\n" % (ctl_path, ctl_hostname, ctl_port, self.service_type, content_len, content)
).encode()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_set_opt(
sock,
bind_addr = (self._bind_ip, 0) if self._bind_ip else None,
interface = self._bind_interface,
timeout = self._sock_timeout
)
sock.connect((ctl_hostname, ctl_port))
sock.sendall(data)
response = b""
while True:
buff = sock.recv(4096)
if not buff:
break
response += buff
sock.close()
try:
socket_set_opt(
sock,
bind_addr = (self._bind_ip, 0) if self._bind_ip else None,
interface = self._bind_interface,
timeout = self._sock_timeout
)
sock.connect((ctl_hostname, ctl_port))
sock.sendall(data)
response = b""
while True:
buff = sock.recv(4096)
if not buff:
break
response += buff
finally:
sock.close()
r = response.decode("utf-8", "ignore")
errno = errmsg = ""
m = re.search(r"<errorCode\s*>([^<]*?)</errorCode\s*>", r)
@@ -1145,29 +1171,31 @@ class UPnPDevice(object):
def _http_get(self, url):
hostname, port, path = split_url(url)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_set_opt(
sock,
bind_addr = (self._bind_ip, 0) if self._bind_ip else None,
interface = self._bind_interface,
timeout = self._sock_timeout
)
sock.connect((hostname, port))
data = (
"GET %s HTTP/1.1\r\n"
"Host: %s\r\n"
"User-Agent: curl/8.0.0 (Natter)\r\n"
"Accept: */*\r\n"
"Connection: close\r\n"
"\r\n" % (path, hostname)
).encode()
sock.sendall(data)
response = b""
while True:
buff = sock.recv(4096)
if not buff:
break
response += buff
sock.close()
try:
socket_set_opt(
sock,
bind_addr = (self._bind_ip, 0) if self._bind_ip else None,
interface = self._bind_interface,
timeout = self._sock_timeout
)
sock.connect((hostname, port))
data = (
"GET %s HTTP/1.1\r\n"
"Host: %s\r\n"
"User-Agent: curl/8.0.0 (Natter)\r\n"
"Accept: */*\r\n"
"Connection: close\r\n"
"\r\n" % (path, hostname)
).encode()
sock.sendall(data)
response = b""
while True:
buff = sock.recv(4096)
if not buff:
break
response += buff
finally:
sock.close()
if not response.startswith(b"HTTP/"):
raise ValueError("Invalid response from HTTP server")
s = response.split(b"\r\n\r\n", 1)
@@ -1240,50 +1268,54 @@ class UPnPClient(object):
def _discover(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
socket_set_opt(
sock,
reuse = True,
bind_addr = (self._bind_ip, 0) if self._bind_ip else None,
interface = self._bind_interface,
timeout = self._sock_timeout
)
dat01 = (
"M-SEARCH * HTTP/1.1\r\n"
"ST: ssdp:all\r\n"
"MX: 2\r\n"
"MAN: \"ssdp:discover\"\r\n"
"HOST: %s:%d\r\n"
"\r\n" % self.ssdp_addr
).encode()
try:
socket_set_opt(
sock,
reuse = True,
bind_addr = (self._bind_ip, 0) if self._bind_ip else None,
interface = self._bind_interface,
timeout = self._sock_timeout
)
dat01 = (
"M-SEARCH * HTTP/1.1\r\n"
"ST: ssdp:all\r\n"
"MX: 2\r\n"
"MAN: \"ssdp:discover\"\r\n"
"HOST: %s:%d\r\n"
"\r\n" % self.ssdp_addr
).encode()
dat02 = (
"M-SEARCH * HTTP/1.1\r\n"
"ST: upnp:rootdevice\r\n"
"MX: 2\r\n"
"MAN: \"ssdp:discover\"\r\n"
"HOST: %s:%d\r\n"
"\r\n" % self.ssdp_addr
).encode()
dat02 = (
"M-SEARCH * HTTP/1.1\r\n"
"ST: upnp:rootdevice\r\n"
"MX: 2\r\n"
"MAN: \"ssdp:discover\"\r\n"
"HOST: %s:%d\r\n"
"\r\n" % self.ssdp_addr
).encode()
sock.sendto(dat01, self.ssdp_addr)
sock.sendto(dat02, self.ssdp_addr)
sock.sendto(dat01, self.ssdp_addr)
sock.sendto(dat02, self.ssdp_addr)
upnp_urls_d = {}
while True:
try:
buff, addr = sock.recvfrom(4096)
m = re.search(r"LOCATION: *(http://[^\[]\S+)\s+", buff.decode("utf-8"))
if not m:
continue
ipaddr = addr[0]
location = m.group(1)
Logger.debug("upnp: Got URL %s" % location)
if ipaddr in upnp_urls_d:
upnp_urls_d[ipaddr].add(location)
else:
upnp_urls_d[ipaddr] = set([location])
except socket.timeout:
break
upnp_urls_d = {}
while True:
try:
buff, addr = sock.recvfrom(4096)
m = re.search(r"LOCATION: *(http://[^\[]\S+)\s+",
buff.decode("utf-8"))
if not m:
continue
ipaddr = addr[0]
location = m.group(1)
Logger.debug("upnp: Got URL %s" % location)
if ipaddr in upnp_urls_d:
upnp_urls_d[ipaddr].add(location)
else:
upnp_urls_d[ipaddr] = set([location])
except socket.timeout:
break
finally:
sock.close()
devs = []
for ipaddr, urls in upnp_urls_d.items():
@@ -1789,6 +1821,7 @@ def natter_main(show_title = True):
Logger.info("Retry after %d seconds..." % interval)
time.sleep(interval)
forwarder.stop_forward()
keep_alive.disconnect()
raise NatterRetryException("Target port is closed")
#
# Main loop
@@ -1809,6 +1842,7 @@ def natter_main(show_title = True):
_, outer_addr_curr = stun.get_mapping()
if outer_addr_curr != outer_addr:
forwarder.stop_forward()
keep_alive.disconnect()
# exit or retry
if exit_when_changed:
Logger.info("Natter is exiting because mapped address has changed")
@@ -1823,7 +1857,7 @@ def natter_main(show_title = True):
Logger.debug("keep-alive: UDP response not received: %s" % ex)
else:
Logger.error("keep-alive: connection broken: %s" % ex)
keep_alive.reset()
keep_alive.disconnect()
need_recheck = True
if upnp_ready:
try: