Harden sandbox security: execution timeout, max recursion depth, output size cap

Agent-Logs-Url: https://github.com/xtekky/gpt4free/sessions/41556926-6205-4207-b36b-e10e22a8b87e

Co-authored-by: hlohaus <983577+hlohaus@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-04-04 17:43:33 +00:00
committed by GitHub
parent 00b2b70a94
commit 5616349135
3 changed files with 286 additions and 27 deletions
+80
View File
@@ -477,3 +477,83 @@ class TestSafeMode(unittest.IsolatedAsyncioTestCase):
result = await tool.execute({"code": "import math\nresult = math.factorial(5)"})
self.assertTrue(result.get("success"))
self.assertEqual(result.get("result"), 120)
class TestSecurityHardening(unittest.IsolatedAsyncioTestCase):
"""Tests for execution timeout, recursion depth, and output size limits."""
def test_execution_timeout(self):
"""Infinite loop is interrupted by the timeout."""
import time
start = time.time()
r = execute_safe_code("while True: pass", timeout=0.5)
elapsed = time.time() - start
self.assertFalse(r.success)
self.assertIn("timed out", r.error.lower())
self.assertLess(elapsed, 3.0, "Should have returned within 3 s")
def test_execution_continues_after_timeout(self):
"""The sandbox is usable again after a previous execution timed out."""
execute_safe_code("while True: pass", timeout=0.3)
r = execute_safe_code("result = 'ok'", timeout=5.0)
self.assertTrue(r.success)
self.assertEqual(r.result, "ok")
def test_recursion_depth_limit(self):
"""Deep recursion is blocked by the max_depth parameter."""
r = execute_safe_code(
"def f(n): return f(n + 1)\nf(0)",
max_depth=50,
timeout=5.0,
)
self.assertFalse(r.success)
def test_output_truncation(self):
"""stdout is capped at MAX_OUTPUT_BYTES; truncation notice appears."""
from g4f.mcp.pa_provider import MAX_OUTPUT_BYTES
# Produce more bytes than the limit
r = execute_safe_code(
f"print('A' * {MAX_OUTPUT_BYTES + 1000})",
timeout=5.0,
)
self.assertTrue(r.success)
self.assertLessEqual(len(r.stdout), MAX_OUTPUT_BYTES + 50)
self.assertIn("truncated", r.stderr.lower())
def test_timeout_none_disables_limit(self):
"""Passing timeout=None does not impose a time limit."""
r = execute_safe_code("result = sum(range(100))", timeout=None)
self.assertTrue(r.success)
self.assertEqual(r.result, 4950)
async def test_tool_respects_timeout_param(self):
"""PythonExecuteTool forwards timeout to execute_safe_code."""
tool = PythonExecuteTool(safe_mode=False)
import time
start = time.time()
result = await tool.execute({"code": "while True: pass", "timeout": 0.5})
elapsed = time.time() - start
self.assertFalse(result.get("success"))
self.assertLess(elapsed, 3.0)
async def test_tool_safe_mode_ignores_timeout_param(self):
"""In safe mode, timeout parameter is ignored and default is used."""
from g4f.mcp.pa_provider import MAX_EXEC_TIMEOUT
tool = PythonExecuteTool(safe_mode=True)
# Passing a very large timeout in safe mode should be ignored;
# the default MAX_EXEC_TIMEOUT is used instead.
result = await tool.execute({
"code": "result = 1",
"timeout": MAX_EXEC_TIMEOUT * 100,
})
self.assertTrue(result.get("success"))
async def test_tool_safe_mode_ignores_max_depth_param(self):
"""In safe mode, max_depth parameter is ignored."""
from g4f.mcp.pa_provider import MAX_RECURSION_DEPTH
tool = PythonExecuteTool(safe_mode=True)
# Even passing a huge depth, safe-mode always uses MAX_RECURSION_DEPTH
result = await tool.execute({
"code": "result = 1",
"max_depth": MAX_RECURSION_DEPTH * 100,
})
self.assertTrue(result.get("success"))
+168 -23
View File
@@ -26,11 +26,13 @@ The sandbox mitigates the following vectors:
* **Code injection** — ``exec``, ``eval``, ``compile``, and ``input`` are removed
from the sandbox built-ins so code in the sandbox cannot spawn secondary
execution contexts.
Known limitations: the sandbox does not enforce CPU/memory limits or wall-clock
timeouts. Callers that need to bound execution time should wrap
:func:`execute_safe_code` with a ``asyncio.wait_for`` or ``concurrent.futures``
timeout.
* **Execution timeout** — code runs in a dedicated thread; if it does not
complete within :data:`MAX_EXEC_TIMEOUT` seconds the result is returned with
an error and the thread is abandoned.
* **Runaway recursion** — ``sys.setrecursionlimit`` is reduced to
:data:`MAX_RECURSION_DEPTH` for the duration of the sandboxed call.
* **Output flooding** — stdout and stderr are each capped at
:data:`MAX_OUTPUT_BYTES`; excess output is silently truncated.
Typical layout of a ``.pa.py`` file::
@@ -56,8 +58,9 @@ from __future__ import annotations
import io
import ast
import sys
import json
import contextlib
import threading
import traceback
import builtins as _builtins
from pathlib import Path
@@ -111,10 +114,85 @@ SAFE_MODULES: FrozenSet[str] = frozenset({
})
# ---------------------------------------------------------------------------
# Security limits
# ---------------------------------------------------------------------------
#: Wall-clock seconds allowed for a single :func:`execute_safe_code` call.
MAX_EXEC_TIMEOUT: float = 30.0
#: Maximum Python call-stack depth inside the sandbox (passed to
#: ``sys.setrecursionlimit``). The default CPython limit is 1 000; using a
#: lower value catches infinite-recursion attacks early.
MAX_RECURSION_DEPTH: int = 500
#: Maximum number of UTF-8 bytes captured from *each* of stdout and stderr.
#: Writes beyond this limit are silently dropped and a truncation notice is
#: appended to stderr.
MAX_OUTPUT_BYTES: int = 65_536 # 64 KiB
# ---------------------------------------------------------------------------
# Sandbox helpers
# ---------------------------------------------------------------------------
class _LimitedStringIO(io.StringIO):
"""StringIO that stops accepting writes once *max_bytes* of UTF-8 content
have been accumulated. Additional writes are silently discarded and
``truncated`` is set to ``True``."""
def __init__(self, max_bytes: int = MAX_OUTPUT_BYTES) -> None:
super().__init__()
self._max_bytes = max_bytes
self._bytes_written = 0
self.truncated = False
def write(self, s: str) -> int:
if self._bytes_written >= self._max_bytes:
self.truncated = True
return 0
encoded = s.encode("utf-8", errors="replace")
remaining = self._max_bytes - self._bytes_written
if len(encoded) > remaining:
s = encoded[:remaining].decode("utf-8", errors="replace")
self.truncated = True
n = super().write(s)
self._bytes_written += len(s.encode("utf-8", errors="replace"))
return n
def _exec_in_thread(
compiled: Any,
safe_globals: Dict[str, Any],
local_vars: Dict[str, Any],
max_depth: int,
exc_box: List,
) -> None:
"""Run *compiled* code with a bounded recursion depth.
``sys.setrecursionlimit`` is set to *max_depth* for the lifetime of this
call and restored afterwards. stdout / stderr capture is handled by
the custom ``print`` injected into the sandbox builtins — no global
``sys.stdout`` redirection is performed so an abandoned timeout thread
cannot corrupt the caller's output streams.
Any exception is stored in *exc_box* (a one-element list) so the caller
can inspect it without needing to join the thread.
This function is designed to run in a *daemon* thread so that it is
automatically discarded when the process exits, even if the sandboxed
code is stuck in an infinite loop.
"""
prev = sys.getrecursionlimit()
sys.setrecursionlimit(max_depth)
try:
exec(compiled, safe_globals, local_vars) # noqa: S102
except Exception: # noqa: BLE001
exc_box.append(traceback.format_exc())
finally:
sys.setrecursionlimit(prev)
def _make_restricted_import(allowed: FrozenSet[str]):
"""Return a ``__import__`` replacement that only allows *allowed* modules."""
original = _builtins.__import__
@@ -135,7 +213,11 @@ def _make_restricted_import(allowed: FrozenSet[str]):
return _restricted_import
def _make_safe_globals(allowed: FrozenSet[str] = SAFE_MODULES) -> Dict[str, Any]:
def _make_safe_globals(
allowed: FrozenSet[str] = SAFE_MODULES,
stdout_buf: Optional[io.StringIO] = None,
stderr_buf: Optional[io.StringIO] = None,
) -> Dict[str, Any]:
"""Return a ``globals`` dict suitable for sandboxed ``exec``."""
workspace = get_workspace_dir()
@@ -168,6 +250,19 @@ def _make_safe_globals(allowed: FrozenSet[str] = SAFE_MODULES) -> Dict[str, Any]
safe_builtins["open"] = _safe_open
safe_builtins["__import__"] = _make_restricted_import(allowed)
# Override print / input so stdout/stderr stay local to this sandbox
# execution and are never written to the real sys.stdout/stderr. This
# avoids the global-state side-effect that contextlib.redirect_stdout
# would cause when the thread is abandoned after a timeout.
if stdout_buf is not None:
_real_print = _builtins.print
def _safe_print(*args, **kwargs):
kwargs.setdefault("file", stdout_buf)
_real_print(*args, **kwargs)
safe_builtins["print"] = _safe_print
return {
"__builtins__": safe_builtins,
"__name__": "__pa_provider__",
@@ -222,51 +317,101 @@ def execute_safe_code(
code: str,
extra_globals: Optional[Dict[str, Any]] = None,
allowed_modules: FrozenSet[str] = SAFE_MODULES,
timeout: Optional[float] = MAX_EXEC_TIMEOUT,
max_depth: int = MAX_RECURSION_DEPTH,
) -> SafeExecutionResult:
"""Execute *code* inside a safe sandbox with whitelisted module imports.
The execution runs in a dedicated thread so that a wall-clock *timeout*
can be enforced without blocking the caller's event loop. A custom
``sys.setrecursionlimit`` guards against stack-overflow attacks. Both
stdout and stderr are capped at :data:`MAX_OUTPUT_BYTES`.
Args:
code: Python source code to execute.
extra_globals: Additional names injected into the execution globals.
allowed_modules: Frozenset of top-level module names that may be imported.
timeout: Wall-clock seconds before the execution is abandoned. Pass
``None`` to disable. Defaults to :data:`MAX_EXEC_TIMEOUT`.
max_depth: Maximum recursion depth inside the sandbox. Defaults to
:data:`MAX_RECURSION_DEPTH`.
Returns:
:class:`SafeExecutionResult` containing captured stdout/stderr, any
``result`` variable assigned in the code, or error information.
"""
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
stdout_buf = _LimitedStringIO(MAX_OUTPUT_BYTES)
stderr_buf = _LimitedStringIO(MAX_OUTPUT_BYTES)
safe_globals = _make_safe_globals(allowed_modules)
safe_globals = _make_safe_globals(allowed_modules, stdout_buf=stdout_buf, stderr_buf=stderr_buf)
if extra_globals:
safe_globals.update(extra_globals)
local_vars: Dict[str, Any] = {}
# Compile outside the thread so SyntaxErrors surface immediately.
try:
compiled = compile(code, "<pa_provider>", "exec")
with (
contextlib.redirect_stdout(stdout_buf),
contextlib.redirect_stderr(stderr_buf),
):
exec(compiled, safe_globals, local_vars) # noqa: S102
except SyntaxError:
return SafeExecutionResult(
success=True,
stdout=stdout_buf.getvalue(),
stderr=stderr_buf.getvalue(),
result=local_vars.get("result"),
locals=local_vars,
success=False,
stdout="",
stderr="",
error=traceback.format_exc(),
)
except Exception:
# Run in a daemon thread with timeout and recursion-depth enforcement.
# We use a raw daemon Thread (not ThreadPoolExecutor) so that if the
# sandboxed code runs forever the thread is discarded when the process
# exits rather than blocking interpreter shutdown.
exc_box: List = []
thread = threading.Thread(
target=_exec_in_thread,
args=(compiled, safe_globals, local_vars, max_depth, exc_box),
daemon=True,
name="g4f-sandbox",
)
thread.start()
thread.join(timeout=timeout)
if thread.is_alive():
# The thread is still running — timeout was hit. We cannot kill it
# but as a daemon thread it will be reaped when the process exits.
stdout = stdout_buf.getvalue()
stderr = stderr_buf.getvalue()
if stdout_buf.truncated or stderr_buf.truncated:
stderr += "\n[Output truncated: size limit reached]"
return SafeExecutionResult(
success=False,
stdout=stdout,
stderr=stderr,
error=(
f"Execution timed out after {timeout:.1f} s. "
"The thread has been abandoned."
),
)
if exc_box:
return SafeExecutionResult(
success=False,
stdout=stdout_buf.getvalue(),
stderr=stderr_buf.getvalue(),
error=traceback.format_exc(),
error=exc_box[0],
)
stdout = stdout_buf.getvalue()
stderr = stderr_buf.getvalue()
if stdout_buf.truncated or stderr_buf.truncated:
stderr += "\n[Output truncated: size limit reached]"
return SafeExecutionResult(
success=True,
stdout=stdout,
stderr=stderr,
result=local_vars.get("result"),
locals=local_vars,
)
# ---------------------------------------------------------------------------
# .pa.py provider loader
+38 -4
View File
@@ -511,7 +511,21 @@ class PythonExecuteTool(MCPTool):
"items": {"type": "string"},
"description": (
"Optional list of additional module names to allow "
"beyond the default whitelist"
"beyond the default whitelist (ignored in safe mode)"
),
},
"timeout": {
"type": "number",
"description": (
"Wall-clock seconds to allow before aborting execution "
f"(max {30.0}s; ignored in safe mode)"
),
},
"max_depth": {
"type": "integer",
"description": (
"Maximum Python call-stack depth inside the sandbox "
f"(max {500}; ignored in safe mode)"
),
},
},
@@ -519,21 +533,41 @@ class PythonExecuteTool(MCPTool):
}
async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
from .pa_provider import execute_safe_code, SAFE_MODULES
from .pa_provider import execute_safe_code, SAFE_MODULES, MAX_EXEC_TIMEOUT, MAX_RECURSION_DEPTH
code = arguments.get("code", "")
if not code:
return {"error": "code parameter is required"}
if self.safe_mode:
# In safe mode the caller cannot expand the module allowlist
# In safe mode the caller cannot override any security parameters
allowed = SAFE_MODULES
timeout = MAX_EXEC_TIMEOUT
max_depth = MAX_RECURSION_DEPTH
else:
extra_names = arguments.get("allowed_extra_modules") or []
allowed = SAFE_MODULES | frozenset(extra_names)
# Allow callers to reduce (but not exceed) the defaults
requested_timeout = arguments.get("timeout")
timeout = (
min(float(requested_timeout), MAX_EXEC_TIMEOUT)
if requested_timeout is not None
else MAX_EXEC_TIMEOUT
)
requested_depth = arguments.get("max_depth")
max_depth = (
min(int(requested_depth), MAX_RECURSION_DEPTH)
if requested_depth is not None
else MAX_RECURSION_DEPTH
)
try:
exec_result = execute_safe_code(code, allowed_modules=allowed)
exec_result = execute_safe_code(
code,
allowed_modules=allowed,
timeout=timeout,
max_depth=max_depth,
)
return exec_result.to_dict()
except Exception as exc:
return {"error": f"Execution error: {exc}"}