From 5a9bc7fb14c84e518f00e8cd92a2863feae46acf Mon Sep 17 00:00:00 2001 From: Nicolas Grekas Date: Thu, 16 Apr 2026 14:15:56 +0200 Subject: [PATCH] feat: Add configurable max_requests for PHP threads (#2292) PHP-FPM recycles worker processes after a configurable number of requests (`pm.max_requests`), preventing memory leaks from accumulating over time. FrankenPHP keeps PHP threads alive indefinitely, so any leak in PHP extensions (e.g. ZTS builds of profiling tools like Blackfire) or application code compounds over hours/days. In production behind reverse proxies like Cloudflare, this can lead to gradual resource exhaustion and eventually 502 errors with no visible warnings in logs. This PR adds a `max_requests` option in the global `frankenphp` block that automatically restarts PHP threads after a given number of requests, fully cleaning up the thread's memory and state. It applies to both regular (module mode) and worker threads. When a thread reaches the limit it exits the C thread loop, triggering a full cleanup including any memory leaked by extensions. A fresh thread is then booted transparently. Other threads continue serving requests during the restart. This cannot be done from userland PHP: restarting a worker script from PHP only resets PHP-level state, not the underlying C thread-local storage where extension-level leaks accumulate. And in module mode (without workers), there is no userland loop to count requests at all. Default is `0` (unlimited), preserving existing behavior. Usage: ```caddyfile { frankenphp { max_requests 500 } } ``` Changes: - New `max_requests` Caddyfile directive in the global `frankenphp` block - New `WithMaxRequests` functional option - New `Rebooting` and `RebootReady` states in the thread state machine for restart coordination - Regular thread restart in `threadregular.go` - Worker thread restart in `threadworker.go` - Safe shutdown: `shutdown()` waits for in-flight reboots to complete before draining threads - Tests for both module and worker mode (sequential and concurrent), with debug log verification - Updated docs --- caddy/app.go | 17 ++++- docs/config.md | 20 ++++++ frankenphp.go | 7 +- frankenphp_test.go | 2 +- internal/state/state.go | 9 +++ maxrequests_regular_test.go | 69 +++++++++++++++++++ options.go | 10 +++ phpthread.go | 24 ++++++- testdata/worker-counter-persistent.php | 10 +++ threadregular.go | 26 ++++++- threadworker.go | 24 +++++++ worker_test.go | 93 ++++++++++++++++++++++++++ 12 files changed, 304 insertions(+), 7 deletions(-) create mode 100644 maxrequests_regular_test.go create mode 100644 testdata/worker-counter-persistent.php diff --git a/caddy/app.go b/caddy/app.go index 9242d870..fbe72eb6 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -57,6 +57,8 @@ type FrankenPHPApp struct { MaxWaitTime time.Duration `json:"max_wait_time,omitempty"` // The maximum amount of time an autoscaled thread may be idle before being deactivated MaxIdleTime time.Duration `json:"max_idle_time,omitempty"` + // EXPERIMENTAL: MaxRequests sets the maximum number of requests a PHP thread handles before restarting (0 = unlimited) + MaxRequests int `json:"max_requests,omitempty"` opts []frankenphp.Option metrics frankenphp.Metrics @@ -153,6 +155,7 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithPhpIni(f.PhpIni), frankenphp.WithMaxWaitTime(f.MaxWaitTime), frankenphp.WithMaxIdleTime(f.MaxIdleTime), + frankenphp.WithMaxRequests(f.MaxRequests), ) for _, w := range f.Workers { @@ -192,6 +195,7 @@ func (f *FrankenPHPApp) Stop() error { f.NumThreads = 0 f.MaxWaitTime = 0 f.MaxIdleTime = 0 + f.MaxRequests = 0 optionsMU.Lock() options = nil @@ -255,6 +259,17 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } f.MaxIdleTime = v + case "max_requests": + if !d.NextArg() { + return d.ArgErr() + } + + v, err := strconv.ParseUint(d.Val(), 10, 32) + if err != nil { + return d.WrapErr(err) + } + + f.MaxRequests = int(v) case "php_ini": parseIniLine := func(d *caddyfile.Dispenser) error { key := d.Val() @@ -311,7 +326,7 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { f.Workers = append(f.Workers, wc) default: - return wrongSubDirectiveError("frankenphp", "num_threads, max_threads, php_ini, worker, max_wait_time, max_idle_time", d.Val()) + return wrongSubDirectiveError("frankenphp", "num_threads, max_threads, php_ini, worker, max_wait_time, max_idle_time, max_requests", d.Val()) } } } diff --git a/docs/config.md b/docs/config.md index bfcdeb12..e16d1fa1 100644 --- a/docs/config.md +++ b/docs/config.md @@ -97,6 +97,7 @@ You can also explicitly configure FrankenPHP using the [global option](https://c max_threads # Limits the number of additional PHP threads that can be started at runtime. Default: num_threads. Can be set to 'auto'. max_wait_time # Sets the maximum time a request may wait for a free PHP thread before timing out. Default: disabled. max_idle_time # Sets the maximum time an autoscaled thread may be idle before being deactivated. Default: 5s. + max_requests # (experimental) Sets the maximum number of requests a PHP thread will handle before being restarted, useful for mitigating memory leaks. Applies to both regular and worker threads. Default: 0 (unlimited). php_ini # Set a php.ini directive. Can be used several times to set multiple directives. worker { file # Sets the path to the worker script. @@ -265,6 +266,25 @@ and otherwise forward the request to the worker matching the path pattern. } ``` +## Restarting Threads After a Number of Requests (Experimental) + +FrankenPHP can automatically restart PHP threads after they have handled a given number of requests. +When a thread reaches the limit, it is fully restarted, +cleaning up all memory and state. Other threads continue to serve requests during the restart. + +If you notice memory growing over time, the ideal fix is to report the leak +to the responsible extension or library maintainer. +But when the fix depends on a third party you don't control, +`max_requests` provides a pragmatic and hopefully temporary workaround for production: + +```caddyfile +{ + frankenphp { + max_requests 500 + } +} +``` + ## Environment Variables The following environment variables can be used to inject Caddy directives in the `Caddyfile` without modifying it: diff --git a/frankenphp.go b/frankenphp.go index d2aaa3c7..52246d01 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -66,7 +66,8 @@ var ( metrics Metrics = nullMetrics{} - maxWaitTime time.Duration + maxWaitTime time.Duration + maxRequestsPerThread int ) type ErrRejected struct { @@ -275,6 +276,7 @@ func Init(options ...Option) error { } maxWaitTime = opt.maxWaitTime + maxRequestsPerThread = opt.maxRequests if opt.maxIdleTime > 0 { maxIdleTime = opt.maxIdleTime @@ -335,7 +337,7 @@ func Init(options ...Option) error { initAutoScaling(mainThread) if globalLogger.Enabled(globalCtx, slog.LevelInfo) { - globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "FrankenPHP started 🐘", slog.String("php_version", Version().Version), slog.Int("num_threads", mainThread.numThreads), slog.Int("max_threads", mainThread.maxThreads)) + globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "FrankenPHP started 🐘", slog.String("php_version", Version().Version), slog.Int("num_threads", mainThread.numThreads), slog.Int("max_threads", mainThread.maxThreads), slog.Int("max_requests", maxRequestsPerThread)) if EmbeddedAppPath != "" { globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "embedded PHP app 📦", slog.String("path", EmbeddedAppPath)) @@ -786,5 +788,6 @@ func resetGlobals() { workersByPath = nil watcherIsEnabled = false maxIdleTime = defaultMaxIdleTime + maxRequestsPerThread = 0 globalMu.Unlock() } diff --git a/frankenphp_test.go b/frankenphp_test.go index 47e65c49..f5355784 100644 --- a/frankenphp_test.go +++ b/frankenphp_test.go @@ -23,8 +23,8 @@ import ( "os" "os/exec" "os/user" - "runtime" "path/filepath" + "runtime" "strconv" "strings" "sync" diff --git a/internal/state/state.go b/internal/state/state.go index 7bdf9c06..f8d2b3ac 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -30,6 +30,11 @@ const ( TransitionRequested TransitionInProgress TransitionComplete + + // thread is exiting the C loop for a full ZTS restart (max_requests) + Rebooting + // C thread has exited and ZTS state is cleaned up, ready to spawn a new C thread + RebootReady ) func (s State) String() string { @@ -58,6 +63,10 @@ func (s State) String() string { return "transition in progress" case TransitionComplete: return "transition complete" + case Rebooting: + return "rebooting" + case RebootReady: + return "reboot ready" default: return "unknown" } diff --git a/maxrequests_regular_test.go b/maxrequests_regular_test.go new file mode 100644 index 00000000..47a39a02 --- /dev/null +++ b/maxrequests_regular_test.go @@ -0,0 +1,69 @@ +package frankenphp_test + +import ( + "log/slog" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + + "github.com/dunglas/frankenphp" + "github.com/stretchr/testify/assert" +) + +// TestModuleMaxRequests verifies that regular (non-worker) PHP threads restart +// after reaching max_requests by checking debug logs for restart messages. +func TestModuleMaxRequests(t *testing.T) { + const maxRequests = 5 + const totalRequests = 30 + + var buf syncBuffer + logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})) + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + for i := 0; i < totalRequests; i++ { + body, resp := testGet("http://example.com/index.php", handler, t) + assert.Equal(t, 200, resp.StatusCode) + assert.Contains(t, body, "I am by birth a Genevese") + } + + restartCount := strings.Count(buf.String(), "max requests reached, restarting thread") + t.Logf("Thread restarts observed: %d", restartCount) + assert.GreaterOrEqual(t, restartCount, 2, + "with maxRequests=%d and %d requests on 2 threads, at least 2 restarts should occur", maxRequests, totalRequests) + }, &testOptions{ + logger: logger, + initOpts: []frankenphp.Option{ + frankenphp.WithNumThreads(2), + frankenphp.WithMaxRequests(maxRequests), + }, + }) +} + +// TestModuleMaxRequestsConcurrent verifies max_requests works under concurrent load +// in module mode. All requests must succeed despite threads restarting. +func TestModuleMaxRequestsConcurrent(t *testing.T) { + const maxRequests = 10 + const totalRequests = 200 + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + var wg sync.WaitGroup + + for i := 0; i < totalRequests; i++ { + wg.Add(1) + go func() { + defer wg.Done() + body, resp := testGet("http://example.com/index.php", handler, t) + assert.Equal(t, 200, resp.StatusCode) + assert.Contains(t, body, "I am by birth a Genevese") + }() + } + wg.Wait() + }, &testOptions{ + initOpts: []frankenphp.Option{ + frankenphp.WithNumThreads(8), + frankenphp.WithMaxRequests(maxRequests), + }, + }) +} diff --git a/options.go b/options.go index 9ba1f916..a9cd2a26 100644 --- a/options.go +++ b/options.go @@ -31,6 +31,7 @@ type opt struct { phpIni map[string]string maxWaitTime time.Duration maxIdleTime time.Duration + maxRequests int } type workerOpt struct { @@ -166,6 +167,15 @@ func WithMaxIdleTime(maxIdleTime time.Duration) Option { } } +// EXPERIMENTAL: WithMaxRequests sets the default max requests before restarting a PHP thread (0 = unlimited). Applies to regular and worker threads. +func WithMaxRequests(maxRequests int) Option { + return func(o *opt) error { + o.maxRequests = maxRequests + + return nil + } +} + // WithWorkerEnv sets environment variables for the worker func WithWorkerEnv(env map[string]string) WorkerOption { return func(w *workerOpt) error { diff --git a/phpthread.go b/phpthread.go index c2660c55..a941de93 100644 --- a/phpthread.go +++ b/phpthread.go @@ -65,6 +65,24 @@ func (thread *phpThread) boot() { thread.state.WaitFor(state.Inactive) } +// reboot exits the C thread loop for full ZTS cleanup, then spawns a fresh C thread. +// Returns false if the thread is no longer in Ready state (e.g. shutting down). +func (thread *phpThread) reboot() bool { + if !thread.state.CompareAndSwap(state.Ready, state.Rebooting) { + return false + } + + go func() { + thread.state.WaitFor(state.RebootReady) + + if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) { + panic("unable to create thread") + } + }() + + return true +} + // shutdown the underlying PHP thread func (thread *phpThread) shutdown() { if !thread.state.RequestSafeStateChange(state.ShuttingDown) { @@ -189,5 +207,9 @@ func go_frankenphp_after_script_execution(threadIndex C.uintptr_t, exitStatus C. func go_frankenphp_on_thread_shutdown(threadIndex C.uintptr_t) { thread := phpThreads[threadIndex] thread.Unpin() - thread.state.Set(state.Done) + if thread.state.Is(state.Rebooting) { + thread.state.Set(state.RebootReady) + } else { + thread.state.Set(state.Done) + } } diff --git a/testdata/worker-counter-persistent.php b/testdata/worker-counter-persistent.php new file mode 100644 index 00000000..2ac69f41 --- /dev/null +++ b/testdata/worker-counter-persistent.php @@ -0,0 +1,10 @@ + 0 && handler.requestCount >= maxRequestsPerThread { + if globalLogger.Enabled(globalCtx, slog.LevelDebug) { + globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "max requests reached, restarting thread", + slog.Int("thread", handler.thread.threadIndex), + slog.Int("max_requests", maxRequestsPerThread), + ) + } + + if handler.thread.reboot() { + return "" + } + } + handler.state.MarkAsWaiting(true) var ch contextHolder @@ -89,6 +110,7 @@ func (handler *regularThread) waitForRequest() string { case ch = <-handler.thread.requestChan: } + handler.requestCount++ handler.thread.contextMu.Lock() handler.ctx = ch.ctx handler.contextHolder.frankenPHPContext = ch.frankenPHPContext diff --git a/threadworker.go b/threadworker.go index bc150024..0fb315d1 100644 --- a/threadworker.go +++ b/threadworker.go @@ -26,6 +26,7 @@ type workerThread struct { workerContext context.Context isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet failureCount int // number of consecutive startup failures + requestCount int // number of requests handled since last restart } func convertToWorkerThread(thread *phpThread, worker *worker) { @@ -62,6 +63,12 @@ func (handler *workerThread) beforeScriptExecution() string { setupWorkerScript(handler, handler.worker) return handler.worker.fileName + case state.Rebooting: + return "" + case state.RebootReady: + handler.requestCount = 0 + handler.state.Set(state.Ready) + return handler.beforeScriptExecution() case state.ShuttingDown: if handler.worker.onThreadShutdown != nil { handler.worker.onThreadShutdown(handler.thread.threadIndex) @@ -116,6 +123,7 @@ func setupWorkerScript(handler *workerThread, worker *worker) { handler.dummyFrankenPHPContext = fc handler.dummyContext = ctx handler.isBootingScript = true + handler.requestCount = 0 if globalLogger.Enabled(ctx, slog.LevelDebug) { globalLogger.LogAttrs(ctx, slog.LevelDebug, "starting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex)) @@ -213,6 +221,21 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { metrics.ReadyWorker(handler.worker.name) } + // max_requests reached: signal reboot for full ZTS cleanup + if maxRequestsPerThread > 0 && handler.requestCount >= maxRequestsPerThread { + if globalLogger.Enabled(globalCtx, slog.LevelDebug) { + globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "max requests reached, restarting", + slog.String("worker", handler.worker.name), + slog.Int("thread", handler.thread.threadIndex), + slog.Int("max_requests", maxRequestsPerThread), + ) + } + + if handler.thread.reboot() { + return false, nil + } + } + if handler.state.Is(state.TransitionComplete) { handler.state.Set(state.Ready) } @@ -237,6 +260,7 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { case requestCH = <-handler.worker.requestChan: } + handler.requestCount++ handler.thread.contextMu.Lock() handler.workerContext = requestCH.ctx handler.workerFrankenPHPContext = requestCH.frankenPHPContext diff --git a/worker_test.go b/worker_test.go index 97cc80d2..3fd2d63f 100644 --- a/worker_test.go +++ b/worker_test.go @@ -5,11 +5,13 @@ import ( "fmt" "io" "log" + "log/slog" "net/http" "net/http/httptest" "net/url" "strconv" "strings" + "sync" "testing" "github.com/dunglas/frankenphp" @@ -169,3 +171,94 @@ func TestKeepRunningOnConnectionAbort(t *testing.T) { assert.Equal(t, "requests:2", body2, "should not have stopped execution after the first request was aborted") }, &testOptions{workerScript: "worker-with-counter.php", nbWorkers: 1, nbParallelRequests: 1}) } + +// TestWorkerMaxRequests verifies that a worker restarts after reaching max_requests. +func TestWorkerMaxRequests(t *testing.T) { + const maxRequests = 5 + const totalRequests = 20 + + var buf syncBuffer + logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})) + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + instanceIDs := make(map[string]int) + + for i := 0; i < totalRequests; i++ { + body, resp := testGet("http://example.com/worker-counter-persistent.php", handler, t) + assert.Equal(t, 200, resp.StatusCode) + + parts := strings.Split(body, ",") + if len(parts) == 2 { + instanceID := strings.TrimPrefix(parts[0], "instance:") + instanceIDs[instanceID]++ + } + } + + t.Logf("Unique worker instances seen: %d (expected >= %d)", len(instanceIDs), totalRequests/maxRequests) + for id, count := range instanceIDs { + t.Logf(" instance %s: handled %d requests", id, count) + } + + assert.GreaterOrEqual(t, len(instanceIDs), totalRequests/maxRequests) + + for id, count := range instanceIDs { + assert.LessOrEqual(t, count, maxRequests, + fmt.Sprintf("instance %s handled %d requests, exceeding max_requests=%d", id, count, maxRequests)) + } + + restartCount := strings.Count(buf.String(), "max requests reached, restarting") + t.Logf("Worker restarts observed: %d", restartCount) + assert.GreaterOrEqual(t, restartCount, 2) + }, &testOptions{ + workerScript: "worker-counter-persistent.php", + nbWorkers: 1, + nbParallelRequests: 1, + logger: logger, + initOpts: []frankenphp.Option{frankenphp.WithNumThreads(2), frankenphp.WithMaxRequests(maxRequests)}, + }) +} + +// TestWorkerMaxRequestsHighConcurrency verifies max_requests works under concurrent load. +func TestWorkerMaxRequestsHighConcurrency(t *testing.T) { + const maxRequests = 10 + const totalRequests = 200 + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + var ( + mu sync.Mutex + instanceIDs = make(map[string]int) + ) + var wg sync.WaitGroup + + for i := 0; i < totalRequests; i++ { + wg.Add(1) + go func() { + defer wg.Done() + body, resp := testGet("http://example.com/worker-counter-persistent.php", handler, t) + assert.Equal(t, 200, resp.StatusCode) + + mu.Lock() + parts := strings.Split(body, ",") + if len(parts) == 2 { + instanceID := strings.TrimPrefix(parts[0], "instance:") + instanceIDs[instanceID]++ + } + mu.Unlock() + }() + } + wg.Wait() + + t.Logf("instances: %d", len(instanceIDs)) + assert.Greater(t, len(instanceIDs), 4, "workers should have restarted multiple times") + + for id, count := range instanceIDs { + assert.LessOrEqual(t, count, maxRequests, + fmt.Sprintf("instance %s handled %d requests, exceeding max_requests=%d", id, count, maxRequests)) + } + }, &testOptions{ + workerScript: "worker-counter-persistent.php", + nbWorkers: 4, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithNumThreads(5), frankenphp.WithMaxRequests(maxRequests)}, + }) +}