mirror of
https://github.com/dunglas/frankenphp.git
synced 2026-04-22 16:27:12 +08:00
feat: Add configurable max_requests for PHP threads (#2292)
PHP-FPM recycles worker processes after a configurable number of
requests (`pm.max_requests`), preventing memory leaks from accumulating
over time. FrankenPHP keeps PHP threads alive indefinitely, so any leak
in PHP extensions (e.g. ZTS builds of profiling tools like Blackfire) or
application code compounds over hours/days. In production behind reverse
proxies like Cloudflare, this can lead to gradual resource exhaustion
and eventually 502 errors with no visible warnings in logs.
This PR adds a `max_requests` option in the global `frankenphp` block
that automatically restarts PHP threads after a given number of
requests, fully cleaning up the thread's memory and state. It applies to
both regular (module mode) and worker threads.
When a thread reaches the limit it exits the C thread loop, triggering a
full cleanup including any memory leaked by extensions. A fresh thread
is then booted transparently. Other threads continue serving requests
during the restart.
This cannot be done from userland PHP: restarting a worker script from
PHP only resets PHP-level state, not the underlying C thread-local
storage where extension-level leaks accumulate. And in module mode
(without workers), there is no userland loop to count requests at all.
Default is `0` (unlimited), preserving existing behavior.
Usage:
```caddyfile
{
frankenphp {
max_requests 500
}
}
```
Changes:
- New `max_requests` Caddyfile directive in the global `frankenphp`
block
- New `WithMaxRequests` functional option
- New `Rebooting` and `RebootReady` states in the thread state machine
for restart coordination
- Regular thread restart in `threadregular.go`
- Worker thread restart in `threadworker.go`
- Safe shutdown: `shutdown()` waits for in-flight reboots to complete
before draining threads
- Tests for both module and worker mode (sequential and concurrent),
with debug log verification
- Updated docs
This commit is contained in:
+16
-1
@@ -57,6 +57,8 @@ type FrankenPHPApp struct {
|
||||
MaxWaitTime time.Duration `json:"max_wait_time,omitempty"`
|
||||
// The maximum amount of time an autoscaled thread may be idle before being deactivated
|
||||
MaxIdleTime time.Duration `json:"max_idle_time,omitempty"`
|
||||
// EXPERIMENTAL: MaxRequests sets the maximum number of requests a PHP thread handles before restarting (0 = unlimited)
|
||||
MaxRequests int `json:"max_requests,omitempty"`
|
||||
|
||||
opts []frankenphp.Option
|
||||
metrics frankenphp.Metrics
|
||||
@@ -153,6 +155,7 @@ func (f *FrankenPHPApp) Start() error {
|
||||
frankenphp.WithPhpIni(f.PhpIni),
|
||||
frankenphp.WithMaxWaitTime(f.MaxWaitTime),
|
||||
frankenphp.WithMaxIdleTime(f.MaxIdleTime),
|
||||
frankenphp.WithMaxRequests(f.MaxRequests),
|
||||
)
|
||||
|
||||
for _, w := range f.Workers {
|
||||
@@ -192,6 +195,7 @@ func (f *FrankenPHPApp) Stop() error {
|
||||
f.NumThreads = 0
|
||||
f.MaxWaitTime = 0
|
||||
f.MaxIdleTime = 0
|
||||
f.MaxRequests = 0
|
||||
|
||||
optionsMU.Lock()
|
||||
options = nil
|
||||
@@ -255,6 +259,17 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
|
||||
}
|
||||
|
||||
f.MaxIdleTime = v
|
||||
case "max_requests":
|
||||
if !d.NextArg() {
|
||||
return d.ArgErr()
|
||||
}
|
||||
|
||||
v, err := strconv.ParseUint(d.Val(), 10, 32)
|
||||
if err != nil {
|
||||
return d.WrapErr(err)
|
||||
}
|
||||
|
||||
f.MaxRequests = int(v)
|
||||
case "php_ini":
|
||||
parseIniLine := func(d *caddyfile.Dispenser) error {
|
||||
key := d.Val()
|
||||
@@ -311,7 +326,7 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
|
||||
|
||||
f.Workers = append(f.Workers, wc)
|
||||
default:
|
||||
return wrongSubDirectiveError("frankenphp", "num_threads, max_threads, php_ini, worker, max_wait_time, max_idle_time", d.Val())
|
||||
return wrongSubDirectiveError("frankenphp", "num_threads, max_threads, php_ini, worker, max_wait_time, max_idle_time, max_requests", d.Val())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,6 +97,7 @@ You can also explicitly configure FrankenPHP using the [global option](https://c
|
||||
max_threads <num_threads> # Limits the number of additional PHP threads that can be started at runtime. Default: num_threads. Can be set to 'auto'.
|
||||
max_wait_time <duration> # Sets the maximum time a request may wait for a free PHP thread before timing out. Default: disabled.
|
||||
max_idle_time <duration> # Sets the maximum time an autoscaled thread may be idle before being deactivated. Default: 5s.
|
||||
max_requests <num> # (experimental) Sets the maximum number of requests a PHP thread will handle before being restarted, useful for mitigating memory leaks. Applies to both regular and worker threads. Default: 0 (unlimited).
|
||||
php_ini <key> <value> # Set a php.ini directive. Can be used several times to set multiple directives.
|
||||
worker {
|
||||
file <path> # Sets the path to the worker script.
|
||||
@@ -265,6 +266,25 @@ and otherwise forward the request to the worker matching the path pattern.
|
||||
}
|
||||
```
|
||||
|
||||
## Restarting Threads After a Number of Requests (Experimental)
|
||||
|
||||
FrankenPHP can automatically restart PHP threads after they have handled a given number of requests.
|
||||
When a thread reaches the limit, it is fully restarted,
|
||||
cleaning up all memory and state. Other threads continue to serve requests during the restart.
|
||||
|
||||
If you notice memory growing over time, the ideal fix is to report the leak
|
||||
to the responsible extension or library maintainer.
|
||||
But when the fix depends on a third party you don't control,
|
||||
`max_requests` provides a pragmatic and hopefully temporary workaround for production:
|
||||
|
||||
```caddyfile
|
||||
{
|
||||
frankenphp {
|
||||
max_requests 500
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Environment Variables
|
||||
|
||||
The following environment variables can be used to inject Caddy directives in the `Caddyfile` without modifying it:
|
||||
|
||||
+5
-2
@@ -66,7 +66,8 @@ var (
|
||||
|
||||
metrics Metrics = nullMetrics{}
|
||||
|
||||
maxWaitTime time.Duration
|
||||
maxWaitTime time.Duration
|
||||
maxRequestsPerThread int
|
||||
)
|
||||
|
||||
type ErrRejected struct {
|
||||
@@ -275,6 +276,7 @@ func Init(options ...Option) error {
|
||||
}
|
||||
|
||||
maxWaitTime = opt.maxWaitTime
|
||||
maxRequestsPerThread = opt.maxRequests
|
||||
|
||||
if opt.maxIdleTime > 0 {
|
||||
maxIdleTime = opt.maxIdleTime
|
||||
@@ -335,7 +337,7 @@ func Init(options ...Option) error {
|
||||
initAutoScaling(mainThread)
|
||||
|
||||
if globalLogger.Enabled(globalCtx, slog.LevelInfo) {
|
||||
globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "FrankenPHP started 🐘", slog.String("php_version", Version().Version), slog.Int("num_threads", mainThread.numThreads), slog.Int("max_threads", mainThread.maxThreads))
|
||||
globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "FrankenPHP started 🐘", slog.String("php_version", Version().Version), slog.Int("num_threads", mainThread.numThreads), slog.Int("max_threads", mainThread.maxThreads), slog.Int("max_requests", maxRequestsPerThread))
|
||||
|
||||
if EmbeddedAppPath != "" {
|
||||
globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "embedded PHP app 📦", slog.String("path", EmbeddedAppPath))
|
||||
@@ -786,5 +788,6 @@ func resetGlobals() {
|
||||
workersByPath = nil
|
||||
watcherIsEnabled = false
|
||||
maxIdleTime = defaultMaxIdleTime
|
||||
maxRequestsPerThread = 0
|
||||
globalMu.Unlock()
|
||||
}
|
||||
|
||||
+1
-1
@@ -23,8 +23,8 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"os/user"
|
||||
"runtime"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
@@ -30,6 +30,11 @@ const (
|
||||
TransitionRequested
|
||||
TransitionInProgress
|
||||
TransitionComplete
|
||||
|
||||
// thread is exiting the C loop for a full ZTS restart (max_requests)
|
||||
Rebooting
|
||||
// C thread has exited and ZTS state is cleaned up, ready to spawn a new C thread
|
||||
RebootReady
|
||||
)
|
||||
|
||||
func (s State) String() string {
|
||||
@@ -58,6 +63,10 @@ func (s State) String() string {
|
||||
return "transition in progress"
|
||||
case TransitionComplete:
|
||||
return "transition complete"
|
||||
case Rebooting:
|
||||
return "rebooting"
|
||||
case RebootReady:
|
||||
return "reboot ready"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
@@ -0,0 +1,69 @@
|
||||
package frankenphp_test
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/dunglas/frankenphp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// TestModuleMaxRequests verifies that regular (non-worker) PHP threads restart
|
||||
// after reaching max_requests by checking debug logs for restart messages.
|
||||
func TestModuleMaxRequests(t *testing.T) {
|
||||
const maxRequests = 5
|
||||
const totalRequests = 30
|
||||
|
||||
var buf syncBuffer
|
||||
logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug}))
|
||||
|
||||
runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) {
|
||||
for i := 0; i < totalRequests; i++ {
|
||||
body, resp := testGet("http://example.com/index.php", handler, t)
|
||||
assert.Equal(t, 200, resp.StatusCode)
|
||||
assert.Contains(t, body, "I am by birth a Genevese")
|
||||
}
|
||||
|
||||
restartCount := strings.Count(buf.String(), "max requests reached, restarting thread")
|
||||
t.Logf("Thread restarts observed: %d", restartCount)
|
||||
assert.GreaterOrEqual(t, restartCount, 2,
|
||||
"with maxRequests=%d and %d requests on 2 threads, at least 2 restarts should occur", maxRequests, totalRequests)
|
||||
}, &testOptions{
|
||||
logger: logger,
|
||||
initOpts: []frankenphp.Option{
|
||||
frankenphp.WithNumThreads(2),
|
||||
frankenphp.WithMaxRequests(maxRequests),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// TestModuleMaxRequestsConcurrent verifies max_requests works under concurrent load
|
||||
// in module mode. All requests must succeed despite threads restarting.
|
||||
func TestModuleMaxRequestsConcurrent(t *testing.T) {
|
||||
const maxRequests = 10
|
||||
const totalRequests = 200
|
||||
|
||||
runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < totalRequests; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
body, resp := testGet("http://example.com/index.php", handler, t)
|
||||
assert.Equal(t, 200, resp.StatusCode)
|
||||
assert.Contains(t, body, "I am by birth a Genevese")
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}, &testOptions{
|
||||
initOpts: []frankenphp.Option{
|
||||
frankenphp.WithNumThreads(8),
|
||||
frankenphp.WithMaxRequests(maxRequests),
|
||||
},
|
||||
})
|
||||
}
|
||||
+10
@@ -31,6 +31,7 @@ type opt struct {
|
||||
phpIni map[string]string
|
||||
maxWaitTime time.Duration
|
||||
maxIdleTime time.Duration
|
||||
maxRequests int
|
||||
}
|
||||
|
||||
type workerOpt struct {
|
||||
@@ -166,6 +167,15 @@ func WithMaxIdleTime(maxIdleTime time.Duration) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// EXPERIMENTAL: WithMaxRequests sets the default max requests before restarting a PHP thread (0 = unlimited). Applies to regular and worker threads.
|
||||
func WithMaxRequests(maxRequests int) Option {
|
||||
return func(o *opt) error {
|
||||
o.maxRequests = maxRequests
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithWorkerEnv sets environment variables for the worker
|
||||
func WithWorkerEnv(env map[string]string) WorkerOption {
|
||||
return func(w *workerOpt) error {
|
||||
|
||||
+23
-1
@@ -65,6 +65,24 @@ func (thread *phpThread) boot() {
|
||||
thread.state.WaitFor(state.Inactive)
|
||||
}
|
||||
|
||||
// reboot exits the C thread loop for full ZTS cleanup, then spawns a fresh C thread.
|
||||
// Returns false if the thread is no longer in Ready state (e.g. shutting down).
|
||||
func (thread *phpThread) reboot() bool {
|
||||
if !thread.state.CompareAndSwap(state.Ready, state.Rebooting) {
|
||||
return false
|
||||
}
|
||||
|
||||
go func() {
|
||||
thread.state.WaitFor(state.RebootReady)
|
||||
|
||||
if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) {
|
||||
panic("unable to create thread")
|
||||
}
|
||||
}()
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// shutdown the underlying PHP thread
|
||||
func (thread *phpThread) shutdown() {
|
||||
if !thread.state.RequestSafeStateChange(state.ShuttingDown) {
|
||||
@@ -189,5 +207,9 @@ func go_frankenphp_after_script_execution(threadIndex C.uintptr_t, exitStatus C.
|
||||
func go_frankenphp_on_thread_shutdown(threadIndex C.uintptr_t) {
|
||||
thread := phpThreads[threadIndex]
|
||||
thread.Unpin()
|
||||
thread.state.Set(state.Done)
|
||||
if thread.state.Is(state.Rebooting) {
|
||||
thread.state.Set(state.RebootReady)
|
||||
} else {
|
||||
thread.state.Set(state.Done)
|
||||
}
|
||||
}
|
||||
|
||||
+10
@@ -0,0 +1,10 @@
|
||||
<?php
|
||||
// Worker that tracks total requests handled across restarts.
|
||||
// Uses a unique instance ID per worker script execution.
|
||||
$instanceId = bin2hex(random_bytes(8));
|
||||
$counter = 0;
|
||||
|
||||
while (frankenphp_handle_request(function () use (&$counter, $instanceId) {
|
||||
$counter++;
|
||||
echo "instance:$instanceId,count:$counter";
|
||||
})) {}
|
||||
+24
-2
@@ -2,6 +2,7 @@ package frankenphp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -15,8 +16,9 @@ import (
|
||||
type regularThread struct {
|
||||
contextHolder
|
||||
|
||||
state *state.ThreadState
|
||||
thread *phpThread
|
||||
state *state.ThreadState
|
||||
thread *phpThread
|
||||
requestCount int
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -50,6 +52,11 @@ func (handler *regularThread) beforeScriptExecution() string {
|
||||
case state.Ready:
|
||||
return handler.waitForRequest()
|
||||
|
||||
case state.RebootReady:
|
||||
handler.requestCount = 0
|
||||
handler.state.Set(state.Ready)
|
||||
return handler.waitForRequest()
|
||||
|
||||
case state.ShuttingDown:
|
||||
detachRegularThread(handler.thread)
|
||||
// signal to stop
|
||||
@@ -77,6 +84,20 @@ func (handler *regularThread) name() string {
|
||||
}
|
||||
|
||||
func (handler *regularThread) waitForRequest() string {
|
||||
// max_requests reached: restart the thread to clean up all ZTS state
|
||||
if maxRequestsPerThread > 0 && handler.requestCount >= maxRequestsPerThread {
|
||||
if globalLogger.Enabled(globalCtx, slog.LevelDebug) {
|
||||
globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "max requests reached, restarting thread",
|
||||
slog.Int("thread", handler.thread.threadIndex),
|
||||
slog.Int("max_requests", maxRequestsPerThread),
|
||||
)
|
||||
}
|
||||
|
||||
if handler.thread.reboot() {
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
handler.state.MarkAsWaiting(true)
|
||||
|
||||
var ch contextHolder
|
||||
@@ -89,6 +110,7 @@ func (handler *regularThread) waitForRequest() string {
|
||||
case ch = <-handler.thread.requestChan:
|
||||
}
|
||||
|
||||
handler.requestCount++
|
||||
handler.thread.contextMu.Lock()
|
||||
handler.ctx = ch.ctx
|
||||
handler.contextHolder.frankenPHPContext = ch.frankenPHPContext
|
||||
|
||||
@@ -26,6 +26,7 @@ type workerThread struct {
|
||||
workerContext context.Context
|
||||
isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet
|
||||
failureCount int // number of consecutive startup failures
|
||||
requestCount int // number of requests handled since last restart
|
||||
}
|
||||
|
||||
func convertToWorkerThread(thread *phpThread, worker *worker) {
|
||||
@@ -62,6 +63,12 @@ func (handler *workerThread) beforeScriptExecution() string {
|
||||
setupWorkerScript(handler, handler.worker)
|
||||
|
||||
return handler.worker.fileName
|
||||
case state.Rebooting:
|
||||
return ""
|
||||
case state.RebootReady:
|
||||
handler.requestCount = 0
|
||||
handler.state.Set(state.Ready)
|
||||
return handler.beforeScriptExecution()
|
||||
case state.ShuttingDown:
|
||||
if handler.worker.onThreadShutdown != nil {
|
||||
handler.worker.onThreadShutdown(handler.thread.threadIndex)
|
||||
@@ -116,6 +123,7 @@ func setupWorkerScript(handler *workerThread, worker *worker) {
|
||||
handler.dummyFrankenPHPContext = fc
|
||||
handler.dummyContext = ctx
|
||||
handler.isBootingScript = true
|
||||
handler.requestCount = 0
|
||||
|
||||
if globalLogger.Enabled(ctx, slog.LevelDebug) {
|
||||
globalLogger.LogAttrs(ctx, slog.LevelDebug, "starting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex))
|
||||
@@ -213,6 +221,21 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) {
|
||||
metrics.ReadyWorker(handler.worker.name)
|
||||
}
|
||||
|
||||
// max_requests reached: signal reboot for full ZTS cleanup
|
||||
if maxRequestsPerThread > 0 && handler.requestCount >= maxRequestsPerThread {
|
||||
if globalLogger.Enabled(globalCtx, slog.LevelDebug) {
|
||||
globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "max requests reached, restarting",
|
||||
slog.String("worker", handler.worker.name),
|
||||
slog.Int("thread", handler.thread.threadIndex),
|
||||
slog.Int("max_requests", maxRequestsPerThread),
|
||||
)
|
||||
}
|
||||
|
||||
if handler.thread.reboot() {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
if handler.state.Is(state.TransitionComplete) {
|
||||
handler.state.Set(state.Ready)
|
||||
}
|
||||
@@ -237,6 +260,7 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) {
|
||||
case requestCH = <-handler.worker.requestChan:
|
||||
}
|
||||
|
||||
handler.requestCount++
|
||||
handler.thread.contextMu.Lock()
|
||||
handler.workerContext = requestCH.ctx
|
||||
handler.workerFrankenPHPContext = requestCH.frankenPHPContext
|
||||
|
||||
@@ -5,11 +5,13 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/dunglas/frankenphp"
|
||||
@@ -169,3 +171,94 @@ func TestKeepRunningOnConnectionAbort(t *testing.T) {
|
||||
assert.Equal(t, "requests:2", body2, "should not have stopped execution after the first request was aborted")
|
||||
}, &testOptions{workerScript: "worker-with-counter.php", nbWorkers: 1, nbParallelRequests: 1})
|
||||
}
|
||||
|
||||
// TestWorkerMaxRequests verifies that a worker restarts after reaching max_requests.
|
||||
func TestWorkerMaxRequests(t *testing.T) {
|
||||
const maxRequests = 5
|
||||
const totalRequests = 20
|
||||
|
||||
var buf syncBuffer
|
||||
logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug}))
|
||||
|
||||
runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) {
|
||||
instanceIDs := make(map[string]int)
|
||||
|
||||
for i := 0; i < totalRequests; i++ {
|
||||
body, resp := testGet("http://example.com/worker-counter-persistent.php", handler, t)
|
||||
assert.Equal(t, 200, resp.StatusCode)
|
||||
|
||||
parts := strings.Split(body, ",")
|
||||
if len(parts) == 2 {
|
||||
instanceID := strings.TrimPrefix(parts[0], "instance:")
|
||||
instanceIDs[instanceID]++
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("Unique worker instances seen: %d (expected >= %d)", len(instanceIDs), totalRequests/maxRequests)
|
||||
for id, count := range instanceIDs {
|
||||
t.Logf(" instance %s: handled %d requests", id, count)
|
||||
}
|
||||
|
||||
assert.GreaterOrEqual(t, len(instanceIDs), totalRequests/maxRequests)
|
||||
|
||||
for id, count := range instanceIDs {
|
||||
assert.LessOrEqual(t, count, maxRequests,
|
||||
fmt.Sprintf("instance %s handled %d requests, exceeding max_requests=%d", id, count, maxRequests))
|
||||
}
|
||||
|
||||
restartCount := strings.Count(buf.String(), "max requests reached, restarting")
|
||||
t.Logf("Worker restarts observed: %d", restartCount)
|
||||
assert.GreaterOrEqual(t, restartCount, 2)
|
||||
}, &testOptions{
|
||||
workerScript: "worker-counter-persistent.php",
|
||||
nbWorkers: 1,
|
||||
nbParallelRequests: 1,
|
||||
logger: logger,
|
||||
initOpts: []frankenphp.Option{frankenphp.WithNumThreads(2), frankenphp.WithMaxRequests(maxRequests)},
|
||||
})
|
||||
}
|
||||
|
||||
// TestWorkerMaxRequestsHighConcurrency verifies max_requests works under concurrent load.
|
||||
func TestWorkerMaxRequestsHighConcurrency(t *testing.T) {
|
||||
const maxRequests = 10
|
||||
const totalRequests = 200
|
||||
|
||||
runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) {
|
||||
var (
|
||||
mu sync.Mutex
|
||||
instanceIDs = make(map[string]int)
|
||||
)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < totalRequests; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
body, resp := testGet("http://example.com/worker-counter-persistent.php", handler, t)
|
||||
assert.Equal(t, 200, resp.StatusCode)
|
||||
|
||||
mu.Lock()
|
||||
parts := strings.Split(body, ",")
|
||||
if len(parts) == 2 {
|
||||
instanceID := strings.TrimPrefix(parts[0], "instance:")
|
||||
instanceIDs[instanceID]++
|
||||
}
|
||||
mu.Unlock()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
t.Logf("instances: %d", len(instanceIDs))
|
||||
assert.Greater(t, len(instanceIDs), 4, "workers should have restarted multiple times")
|
||||
|
||||
for id, count := range instanceIDs {
|
||||
assert.LessOrEqual(t, count, maxRequests,
|
||||
fmt.Sprintf("instance %s handled %d requests, exceeding max_requests=%d", id, count, maxRequests))
|
||||
}
|
||||
}, &testOptions{
|
||||
workerScript: "worker-counter-persistent.php",
|
||||
nbWorkers: 4,
|
||||
nbParallelRequests: 1,
|
||||
initOpts: []frankenphp.Option{frankenphp.WithNumThreads(5), frankenphp.WithMaxRequests(maxRequests)},
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user