mirror of
https://github.com/dunglas/frankenphp.git
synced 2026-04-23 00:37:20 +08:00
5a9bc7fb14
PHP-FPM recycles worker processes after a configurable number of
requests (`pm.max_requests`), preventing memory leaks from accumulating
over time. FrankenPHP keeps PHP threads alive indefinitely, so any leak
in PHP extensions (e.g. ZTS builds of profiling tools like Blackfire) or
application code compounds over hours/days. In production behind reverse
proxies like Cloudflare, this can lead to gradual resource exhaustion
and eventually 502 errors with no visible warnings in logs.
This PR adds a `max_requests` option in the global `frankenphp` block
that automatically restarts PHP threads after a given number of
requests, fully cleaning up the thread's memory and state. It applies to
both regular (module mode) and worker threads.
When a thread reaches the limit it exits the C thread loop, triggering a
full cleanup including any memory leaked by extensions. A fresh thread
is then booted transparently. Other threads continue serving requests
during the restart.
This cannot be done from userland PHP: restarting a worker script from
PHP only resets PHP-level state, not the underlying C thread-local
storage where extension-level leaks accumulate. And in module mode
(without workers), there is no userland loop to count requests at all.
Default is `0` (unlimited), preserving existing behavior.
Usage:
```caddyfile
{
frankenphp {
max_requests 500
}
}
```
Changes:
- New `max_requests` Caddyfile directive in the global `frankenphp`
block
- New `WithMaxRequests` functional option
- New `Rebooting` and `RebootReady` states in the thread state machine
for restart coordination
- Regular thread restart in `threadregular.go`
- Worker thread restart in `threadworker.go`
- Safe shutdown: `shutdown()` waits for in-flight reboots to complete
before draining threads
- Tests for both module and worker mode (sequential and concurrent),
with debug log verification
- Updated docs
354 lines
12 KiB
Go
354 lines
12 KiB
Go
package frankenphp
|
|
|
|
// #include "frankenphp.h"
|
|
import "C"
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"path/filepath"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"github.com/dunglas/frankenphp/internal/state"
|
|
)
|
|
|
|
// representation of a thread assigned to a worker script
|
|
// executes the PHP worker script in a loop
|
|
// implements the threadHandler interface
|
|
type workerThread struct {
|
|
state *state.ThreadState
|
|
thread *phpThread
|
|
worker *worker
|
|
dummyFrankenPHPContext *frankenPHPContext
|
|
dummyContext context.Context
|
|
workerFrankenPHPContext *frankenPHPContext
|
|
workerContext context.Context
|
|
isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet
|
|
failureCount int // number of consecutive startup failures
|
|
requestCount int // number of requests handled since last restart
|
|
}
|
|
|
|
func convertToWorkerThread(thread *phpThread, worker *worker) {
|
|
thread.setHandler(&workerThread{
|
|
state: thread.state,
|
|
thread: thread,
|
|
worker: worker,
|
|
})
|
|
worker.attachThread(thread)
|
|
}
|
|
|
|
// beforeScriptExecution returns the name of the script or an empty string on shutdown
|
|
func (handler *workerThread) beforeScriptExecution() string {
|
|
switch handler.state.Get() {
|
|
case state.TransitionRequested:
|
|
if handler.worker.onThreadShutdown != nil {
|
|
handler.worker.onThreadShutdown(handler.thread.threadIndex)
|
|
}
|
|
handler.worker.detachThread(handler.thread)
|
|
return handler.thread.transitionToNewHandler()
|
|
case state.Restarting:
|
|
if handler.worker.onThreadShutdown != nil {
|
|
handler.worker.onThreadShutdown(handler.thread.threadIndex)
|
|
}
|
|
handler.state.Set(state.Yielding)
|
|
handler.state.WaitFor(state.Ready, state.ShuttingDown)
|
|
return handler.beforeScriptExecution()
|
|
case state.Ready, state.TransitionComplete:
|
|
handler.thread.updateContext(true)
|
|
if handler.worker.onThreadReady != nil {
|
|
handler.worker.onThreadReady(handler.thread.threadIndex)
|
|
}
|
|
|
|
setupWorkerScript(handler, handler.worker)
|
|
|
|
return handler.worker.fileName
|
|
case state.Rebooting:
|
|
return ""
|
|
case state.RebootReady:
|
|
handler.requestCount = 0
|
|
handler.state.Set(state.Ready)
|
|
return handler.beforeScriptExecution()
|
|
case state.ShuttingDown:
|
|
if handler.worker.onThreadShutdown != nil {
|
|
handler.worker.onThreadShutdown(handler.thread.threadIndex)
|
|
}
|
|
handler.worker.detachThread(handler.thread)
|
|
|
|
// signal to stop
|
|
return ""
|
|
}
|
|
|
|
panic("unexpected state: " + handler.state.Name())
|
|
}
|
|
|
|
func (handler *workerThread) afterScriptExecution(exitStatus int) {
|
|
tearDownWorkerScript(handler, exitStatus)
|
|
}
|
|
|
|
func (handler *workerThread) frankenPHPContext() *frankenPHPContext {
|
|
if handler.workerFrankenPHPContext != nil {
|
|
return handler.workerFrankenPHPContext
|
|
}
|
|
|
|
return handler.dummyFrankenPHPContext
|
|
}
|
|
func (handler *workerThread) context() context.Context {
|
|
if handler.workerContext != nil {
|
|
return handler.workerContext
|
|
}
|
|
|
|
return handler.dummyContext
|
|
}
|
|
|
|
func (handler *workerThread) name() string {
|
|
return "Worker PHP Thread - " + handler.worker.fileName
|
|
}
|
|
|
|
func setupWorkerScript(handler *workerThread, worker *worker) {
|
|
metrics.StartWorker(worker.name)
|
|
|
|
// Create a dummy request to set up the worker
|
|
fc, err := newDummyContext(
|
|
filepath.Base(worker.fileName),
|
|
worker.requestOptions...,
|
|
)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
ctx := context.WithValue(globalCtx, contextKey, fc)
|
|
|
|
fc.worker = worker
|
|
handler.dummyFrankenPHPContext = fc
|
|
handler.dummyContext = ctx
|
|
handler.isBootingScript = true
|
|
handler.requestCount = 0
|
|
|
|
if globalLogger.Enabled(ctx, slog.LevelDebug) {
|
|
globalLogger.LogAttrs(ctx, slog.LevelDebug, "starting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex))
|
|
}
|
|
}
|
|
|
|
func tearDownWorkerScript(handler *workerThread, exitStatus int) {
|
|
worker := handler.worker
|
|
handler.dummyFrankenPHPContext = nil
|
|
handler.dummyContext = nil
|
|
|
|
// if the worker request is not nil, the script might have crashed
|
|
// make sure to close the worker request context
|
|
if handler.workerFrankenPHPContext != nil {
|
|
handler.workerFrankenPHPContext.closeContext()
|
|
handler.thread.contextMu.Lock()
|
|
handler.workerFrankenPHPContext = nil
|
|
handler.workerContext = nil
|
|
handler.thread.contextMu.Unlock()
|
|
}
|
|
|
|
// on exit status 0 we just run the worker script again
|
|
if exitStatus == 0 && !handler.isBootingScript {
|
|
metrics.StopWorker(worker.name, StopReasonRestart)
|
|
|
|
if globalLogger.Enabled(globalCtx, slog.LevelDebug) {
|
|
globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "restarting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// worker has thrown a fatal error or has not reached frankenphp_handle_request
|
|
if handler.isBootingScript {
|
|
metrics.StopWorker(worker.name, StopReasonBootFailure)
|
|
} else {
|
|
metrics.StopWorker(worker.name, StopReasonCrash)
|
|
}
|
|
|
|
if !handler.isBootingScript {
|
|
// fatal error (could be due to exit(1), timeouts, etc.)
|
|
if globalLogger.Enabled(globalCtx, slog.LevelDebug) {
|
|
globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "restarting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
if worker.maxConsecutiveFailures >= 0 && startupFailChan != nil && !watcherIsEnabled && handler.failureCount >= worker.maxConsecutiveFailures {
|
|
startupFailChan <- fmt.Errorf("too many consecutive failures: worker %s has not reached frankenphp_handle_request()", worker.fileName)
|
|
handler.thread.state.Set(state.ShuttingDown)
|
|
return
|
|
}
|
|
|
|
if watcherIsEnabled {
|
|
// worker script has probably failed due to script changes while watcher is enabled
|
|
if globalLogger.Enabled(globalCtx, slog.LevelError) {
|
|
globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "(watcher enabled) worker script has not reached frankenphp_handle_request()", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex))
|
|
}
|
|
} else {
|
|
// rare case where worker script has failed on a restart during normal operation
|
|
// this can happen if startup success depends on external resources
|
|
if globalLogger.Enabled(globalCtx, slog.LevelWarn) {
|
|
globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "worker script has failed on restart", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.failureCount))
|
|
}
|
|
}
|
|
|
|
// wait a bit and try again (exponential backoff)
|
|
backoffDuration := time.Duration(handler.failureCount*handler.failureCount*100) * time.Millisecond
|
|
if backoffDuration > time.Second {
|
|
backoffDuration = time.Second
|
|
}
|
|
handler.failureCount++
|
|
time.Sleep(backoffDuration)
|
|
}
|
|
|
|
// waitForWorkerRequest is called during frankenphp_handle_request in the php worker script.
|
|
func (handler *workerThread) waitForWorkerRequest() (bool, any) {
|
|
// unpin any memory left over from previous requests
|
|
handler.thread.Unpin()
|
|
|
|
if globalLogger.Enabled(globalCtx, slog.LevelDebug) {
|
|
globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "waiting for request", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex))
|
|
}
|
|
|
|
// Clear the first dummy request created to initialize the worker
|
|
if handler.isBootingScript {
|
|
handler.isBootingScript = false
|
|
handler.failureCount = 0
|
|
if !C.frankenphp_shutdown_dummy_request() {
|
|
panic("Not in CGI context")
|
|
}
|
|
|
|
// worker is truly ready only after reaching frankenphp_handle_request()
|
|
metrics.ReadyWorker(handler.worker.name)
|
|
}
|
|
|
|
// max_requests reached: signal reboot for full ZTS cleanup
|
|
if maxRequestsPerThread > 0 && handler.requestCount >= maxRequestsPerThread {
|
|
if globalLogger.Enabled(globalCtx, slog.LevelDebug) {
|
|
globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "max requests reached, restarting",
|
|
slog.String("worker", handler.worker.name),
|
|
slog.Int("thread", handler.thread.threadIndex),
|
|
slog.Int("max_requests", maxRequestsPerThread),
|
|
)
|
|
}
|
|
|
|
if handler.thread.reboot() {
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
if handler.state.Is(state.TransitionComplete) {
|
|
handler.state.Set(state.Ready)
|
|
}
|
|
|
|
handler.state.MarkAsWaiting(true)
|
|
|
|
var requestCH contextHolder
|
|
select {
|
|
case <-handler.thread.drainChan:
|
|
if globalLogger.Enabled(globalCtx, slog.LevelDebug) {
|
|
globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "shutting down", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex))
|
|
}
|
|
|
|
// flush the opcache when restarting due to watcher or admin api
|
|
// note: this is done right before frankenphp_handle_request() returns 'false'
|
|
if handler.state.Is(state.Restarting) {
|
|
C.frankenphp_reset_opcache()
|
|
}
|
|
|
|
return false, nil
|
|
case requestCH = <-handler.thread.requestChan:
|
|
case requestCH = <-handler.worker.requestChan:
|
|
}
|
|
|
|
handler.requestCount++
|
|
handler.thread.contextMu.Lock()
|
|
handler.workerContext = requestCH.ctx
|
|
handler.workerFrankenPHPContext = requestCH.frankenPHPContext
|
|
handler.thread.contextMu.Unlock()
|
|
handler.state.MarkAsWaiting(false)
|
|
|
|
if globalLogger.Enabled(requestCH.ctx, slog.LevelDebug) {
|
|
if handler.workerFrankenPHPContext.request == nil {
|
|
globalLogger.LogAttrs(requestCH.ctx, slog.LevelDebug, "request handling started", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex))
|
|
} else {
|
|
globalLogger.LogAttrs(requestCH.ctx, slog.LevelDebug, "request handling started", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex), slog.String("url", handler.workerFrankenPHPContext.request.RequestURI))
|
|
}
|
|
}
|
|
|
|
return true, handler.workerFrankenPHPContext.handlerParameters
|
|
}
|
|
|
|
// go_frankenphp_worker_handle_request_start is called at the start of every php request served.
|
|
//
|
|
//export go_frankenphp_worker_handle_request_start
|
|
func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) (C.bool, unsafe.Pointer) {
|
|
handler := phpThreads[threadIndex].handler.(*workerThread)
|
|
hasRequest, parameters := handler.waitForWorkerRequest()
|
|
|
|
if parameters != nil {
|
|
var ptr unsafe.Pointer
|
|
|
|
switch p := parameters.(type) {
|
|
case unsafe.Pointer:
|
|
ptr = p
|
|
|
|
default:
|
|
ptr = PHPValue(p)
|
|
}
|
|
handler.thread.Pin(ptr)
|
|
|
|
return C.bool(hasRequest), ptr
|
|
}
|
|
|
|
return C.bool(hasRequest), nil
|
|
}
|
|
|
|
// go_frankenphp_finish_worker_request is called at the end of every php request served.
|
|
//
|
|
//export go_frankenphp_finish_worker_request
|
|
func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t, retval *C.zval) {
|
|
thread := phpThreads[threadIndex]
|
|
ctx := thread.context()
|
|
fc := ctx.Value(contextKey).(*frankenPHPContext)
|
|
|
|
if retval != nil {
|
|
r, err := GoValue[any](unsafe.Pointer(retval))
|
|
if err != nil && globalLogger.Enabled(ctx, slog.LevelError) {
|
|
globalLogger.LogAttrs(ctx, slog.LevelError, "cannot convert return value", slog.Any("error", err), slog.Int("thread", thread.threadIndex))
|
|
}
|
|
|
|
fc.handlerReturn = r
|
|
}
|
|
|
|
thread.requestCount.Add(1)
|
|
|
|
fc.closeContext()
|
|
thread.contextMu.Lock()
|
|
thread.handler.(*workerThread).workerFrankenPHPContext = nil
|
|
thread.handler.(*workerThread).workerContext = nil
|
|
thread.contextMu.Unlock()
|
|
|
|
if globalLogger.Enabled(ctx, slog.LevelDebug) {
|
|
if fc.request == nil {
|
|
fc.logger.LogAttrs(ctx, slog.LevelDebug, "request handling finished", slog.String("worker", fc.worker.name), slog.Int("thread", thread.threadIndex))
|
|
} else {
|
|
fc.logger.LogAttrs(ctx, slog.LevelDebug, "request handling finished", slog.String("worker", fc.worker.name), slog.Int("thread", thread.threadIndex), slog.String("url", fc.request.RequestURI))
|
|
}
|
|
}
|
|
}
|
|
|
|
// when frankenphp_finish_request() is directly called from PHP
|
|
//
|
|
//export go_frankenphp_finish_php_request
|
|
func go_frankenphp_finish_php_request(threadIndex C.uintptr_t) {
|
|
thread := phpThreads[threadIndex]
|
|
fc := thread.frankenPHPContext()
|
|
|
|
fc.closeContext()
|
|
|
|
ctx := thread.context()
|
|
if fc.logger.Enabled(ctx, slog.LevelDebug) {
|
|
fc.logger.LogAttrs(ctx, slog.LevelDebug, "request handling finished", slog.Int("thread", thread.threadIndex), slog.String("url", fc.request.RequestURI))
|
|
}
|
|
}
|