From 96b0ecea6b3cd03223ed012bec9ed464c59c2838 Mon Sep 17 00:00:00 2001 From: mouxin <494624263qq@gmail.com> Date: Fri, 20 Mar 2026 10:51:37 +0800 Subject: [PATCH] [Feature] Update Counter Release (#6943) --- docs/online_serving/router.md | 2 +- docs/zh/online_serving/router.md | 2 +- .../config/config.example.yaml | 2 +- .../config/config.example.yaml | 2 +- .../internal/scheduler/common/counter.go | 12 +- .../internal/scheduler/handler/handler.go | 68 ++++-- .../scheduler/handler/handler_test.go | 213 +++++++++++++++--- 7 files changed, 251 insertions(+), 50 deletions(-) diff --git a/docs/online_serving/router.md b/docs/online_serving/router.md index d90c3574ca..cbb0661e9f 100644 --- a/docs/online_serving/router.md +++ b/docs/online_serving/router.md @@ -192,7 +192,7 @@ server: scheduler: policy: "power_of_two" # Scheduling policy (optional): random, power_of_two, round_robin, process_tokens, request_num, cache_aware, remote_cache_aware, fd_metrics_score, fd_remote_metrics_score prefill-policy: "cache_aware" # Prefill scheduling policy in PD mode - decode-policy: "fd_metrics_score" # Decode scheduling policy in PD mode + decode-policy: "request_num" # Decode scheduling policy in PD mode eviction-interval-secs: 60 # Cache eviction interval for CacheAware scheduling balance-abs-threshold: 1 # Absolute threshold for CacheAware balancing balance-rel-threshold: 0.2 # Relative threshold for CacheAware balancing diff --git a/docs/zh/online_serving/router.md b/docs/zh/online_serving/router.md index eaf50722c7..f806dd64d3 100644 --- a/docs/zh/online_serving/router.md +++ b/docs/zh/online_serving/router.md @@ -192,7 +192,7 @@ server: scheduler: policy: "power_of_two" # 调度策略(可选): random, power_of_two, round_robin, process_tokens, request_num, cache_aware, remote_cache_aware, fd_metrics_score, fd_remote_metrics_score; 默认: request_num prefill-policy: "cache_aware" # pd分离模式下prefill节点调度策略; 默认: process_tokens - decode-policy: "fd_metrics_score" # pd分离模式下decode节点调度策略; 默认: request_num + decode-policy: "request_num" # pd分离模式下decode节点调度策略; 默认: request_num eviction-interval-secs: 60 # cache-aware策略清理过期cache的间隔时间 balance-abs-threshold: 1 # cache-aware策略绝对阈值 balance-rel-threshold: 0.2 # cache-aware策略相对阈值 diff --git a/fastdeploy/golang_router/examples/run_with_config/config/config.example.yaml b/fastdeploy/golang_router/examples/run_with_config/config/config.example.yaml index 712b2bcee7..ba1a51acc3 100644 --- a/fastdeploy/golang_router/examples/run_with_config/config/config.example.yaml +++ b/fastdeploy/golang_router/examples/run_with_config/config/config.example.yaml @@ -7,7 +7,7 @@ server: scheduler: policy: "power_of_two" prefill-policy: "cache_aware" - decode-policy: "fd_metrics_score" + decode-policy: "request_num" eviction-interval-secs: 60 balance-abs-threshold: 1 balance-rel-threshold: 0.2 diff --git a/fastdeploy/golang_router/examples/run_with_default_workers/config/config.example.yaml b/fastdeploy/golang_router/examples/run_with_default_workers/config/config.example.yaml index 712b2bcee7..ba1a51acc3 100644 --- a/fastdeploy/golang_router/examples/run_with_default_workers/config/config.example.yaml +++ b/fastdeploy/golang_router/examples/run_with_default_workers/config/config.example.yaml @@ -7,7 +7,7 @@ server: scheduler: policy: "power_of_two" prefill-policy: "cache_aware" - decode-policy: "fd_metrics_score" + decode-policy: "request_num" eviction-interval-secs: 60 balance-abs-threshold: 1 balance-rel-threshold: 0.2 diff --git a/fastdeploy/golang_router/internal/scheduler/common/counter.go b/fastdeploy/golang_router/internal/scheduler/common/counter.go index bfe436bc00..a780c83f9a 100644 --- a/fastdeploy/golang_router/internal/scheduler/common/counter.go +++ b/fastdeploy/golang_router/internal/scheduler/common/counter.go @@ -12,8 +12,16 @@ func (c *Counter) Inc() { c.count.Add(1) } -func (c *Counter) Dec() { - c.count.Add(^uint64(0)) +func (c *Counter) Dec() bool { + for { + old := c.count.Load() + if old == 0 { + return false + } + if c.count.CompareAndSwap(old, old-1) { + return true + } + } } func (c *Counter) Get() uint64 { diff --git a/fastdeploy/golang_router/internal/scheduler/handler/handler.go b/fastdeploy/golang_router/internal/scheduler/handler/handler.go index 76009285ec..0d62346de5 100644 --- a/fastdeploy/golang_router/internal/scheduler/handler/handler.go +++ b/fastdeploy/golang_router/internal/scheduler/handler/handler.go @@ -139,13 +139,22 @@ func SelectWorker(ctx context.Context, workers []string, message string, workerT return selectWorkerURL, nil } -// Release decreases the counter for the specified worker URL +// Release decreases the counter for the specified worker URL. +// Uses GetCounter (not GetOrCreateCounter) to avoid creating ghost entries +// when the counter has already been cleaned up. func Release(ctx context.Context, url string) { if DefaultScheduler == nil { return } - counter := GetOrCreateCounter(ctx, url) - counter.Dec() + counter, exists := GetCounter(ctx, url) + if !exists { + logger.Warn(ctx, "release worker: %s skipped, counter already cleaned up", url) + return + } + if !counter.Dec() { + logger.Warn(ctx, "release worker: %s skipped, counter already zero (possible double-release)", url) + return + } logger.Info(ctx, "release worker: %s, count: %d", url, counter.Get()) } @@ -174,7 +183,9 @@ func GetOrCreateCounter(ctx context.Context, url string) *scheduler_common.Count return newCounter } -// CleanupUnhealthyCounter removes counters for unhealthy worker URLs +// CleanupUnhealthyCounter removes counters for unhealthy worker URLs only +// when the counter has reached zero (no inflight requests). If there are +// still inflight requests, the counter is preserved so Dec() works correctly. func CleanupUnhealthyCounter(ctx context.Context, unhealthyRootURL string) { if unhealthyRootURL == "" { return @@ -187,12 +198,27 @@ func CleanupUnhealthyCounter(ctx context.Context, unhealthyRootURL string) { DefaultScheduler.mu.Lock() defer DefaultScheduler.mu.Unlock() - delete(DefaultScheduler.IdCounterMap, unhealthyRootURL) - delete(DefaultScheduler.tokenMap, unhealthyRootURL) - logger.Info(ctx, "cleanup unhealthy worker counter: %s", unhealthyRootURL) + if counter, exists := DefaultScheduler.IdCounterMap[unhealthyRootURL]; exists { + if counter.Get() > 0 { + logger.Info(ctx, "unhealthy worker counter preserved (inflight requests): %s, count: %d", unhealthyRootURL, counter.Get()) + } else { + delete(DefaultScheduler.IdCounterMap, unhealthyRootURL) + logger.Info(ctx, "cleanup unhealthy worker counter: %s", unhealthyRootURL) + } + } + + if tokenCounter, exists := DefaultScheduler.tokenMap[unhealthyRootURL]; exists { + if tokenCounter.Get() > 0 { + logger.Info(ctx, "unhealthy worker token counter preserved (inflight requests): %s, tokens: %d", unhealthyRootURL, tokenCounter.Get()) + } else { + delete(DefaultScheduler.tokenMap, unhealthyRootURL) + logger.Info(ctx, "cleanup unhealthy worker token counter: %s", unhealthyRootURL) + } + } } // CleanupInvalidCounters removes counters for invalid or unreachable workers +// only when their counter has reached zero (no inflight requests). func CleanupInvalidCounters(ctx context.Context) { if DefaultScheduler == nil { return @@ -214,22 +240,32 @@ func CleanupInvalidCounters(ctx context.Context) { defer DefaultScheduler.mu.Unlock() var removed []string - for rootURL := range DefaultScheduler.IdCounterMap { + var preserved []string + for rootURL, counter := range DefaultScheduler.IdCounterMap { if _, exists := healthyMap[rootURL]; !exists { - delete(DefaultScheduler.IdCounterMap, rootURL) - removed = append(removed, rootURL) + if counter.Get() > 0 { + preserved = append(preserved, rootURL) + } else { + delete(DefaultScheduler.IdCounterMap, rootURL) + removed = append(removed, rootURL) + } } } - for rootURL := range DefaultScheduler.tokenMap { + for rootURL, tokenCounter := range DefaultScheduler.tokenMap { if _, exists := healthyMap[rootURL]; !exists { - delete(DefaultScheduler.tokenMap, rootURL) + if tokenCounter.Get() == 0 { + delete(DefaultScheduler.tokenMap, rootURL) + } } } if len(removed) > 0 { logger.Info(ctx, "removed counters for %d unhealthy workers: %v", len(removed), removed) } + if len(preserved) > 0 { + logger.Info(ctx, "preserved counters for %d workers with inflight requests: %v", len(preserved), preserved) + } } // StartBackupCleanupTask starts a background task for cleaning up invalid counters @@ -282,12 +318,16 @@ func estimateTokens(message string) uint64 { return uint64(runeCount * 2) } -// ReleasePrefillTokens releases the corresponding token load when request ends +// ReleasePrefillTokens releases the corresponding token load when request ends. +// Uses GetTokenCounter (not GetOrCreateTokenCounter) to avoid creating ghost entries. func ReleasePrefillTokens(ctx context.Context, url, message string) { if DefaultScheduler == nil || url == "" || message == "" { return } - tokenCounter := GetOrCreateTokenCounter(ctx, url) + tokenCounter, exists := GetTokenCounter(ctx, url) + if !exists { + return + } tokenCounter.Sub(estimateTokens(message)) logger.Info(ctx, "release prefill tokens: %s, tokens: %d", url, tokenCounter.Get()) } diff --git a/fastdeploy/golang_router/internal/scheduler/handler/handler_test.go b/fastdeploy/golang_router/internal/scheduler/handler/handler_test.go index 7b120cf49a..6a540b4e77 100644 --- a/fastdeploy/golang_router/internal/scheduler/handler/handler_test.go +++ b/fastdeploy/golang_router/internal/scheduler/handler/handler_test.go @@ -97,7 +97,16 @@ func TestCounterOperations(t *testing.T) { counter.Inc() assert.Equal(t, uint64(1), counter.Get()) - counter.Dec() + ok := counter.Dec() + assert.True(t, ok) + assert.Equal(t, uint64(0), counter.Get()) + }) + + t.Run("counter underflow protection", func(t *testing.T) { + counter := GetOrCreateCounter(ctx, "test-underflow") + assert.Equal(t, uint64(0), counter.Get()) + ok := counter.Dec() + assert.False(t, ok) assert.Equal(t, uint64(0), counter.Get()) }) @@ -117,28 +126,49 @@ func TestCleanupInvalidCounters(t *testing.T) { ctx := context.Background() Init(&config.Config{}, &mockManagerAPI{}) - // Add some counters - c1 := GetOrCreateCounter(ctx, "worker1") - c1.Inc() - GetOrCreateCounter(ctx, "invalid-worker") // Should be cleaned up + t.Run("idle invalid counter deleted", func(t *testing.T) { + // Add some counters + c1 := GetOrCreateCounter(ctx, "worker1") + c1.Inc() + GetOrCreateCounter(ctx, "invalid-worker") // idle, should be cleaned up - tc1 := GetOrCreateTokenCounter(ctx, "worker1") - tc1.Add(100) - GetOrCreateTokenCounter(ctx, "invalid-worker") // Should be cleaned up + tc1 := GetOrCreateTokenCounter(ctx, "worker1") + tc1.Add(100) + GetOrCreateTokenCounter(ctx, "invalid-worker") // idle, should be cleaned up - CleanupInvalidCounters(ctx) + CleanupInvalidCounters(ctx) - // Verify counters - _, exists := GetCounter(ctx, "worker1") - assert.True(t, exists) - _, exists = GetCounter(ctx, "invalid-worker") - assert.False(t, exists) + // Healthy worker counters remain + _, exists := GetCounter(ctx, "worker1") + assert.True(t, exists) + _, exists = GetTokenCounter(ctx, "worker1") + assert.True(t, exists) - // Verify token counters - _, exists = GetTokenCounter(ctx, "worker1") - assert.True(t, exists) - _, exists = GetTokenCounter(ctx, "invalid-worker") - assert.False(t, exists) + // Idle invalid worker counters deleted + _, exists = GetCounter(ctx, "invalid-worker") + assert.False(t, exists) + _, exists = GetTokenCounter(ctx, "invalid-worker") + assert.False(t, exists) + }) + + t.Run("inflight invalid counter preserved", func(t *testing.T) { + Init(&config.Config{}, &mockManagerAPI{}) + + inflightCounter := GetOrCreateCounter(ctx, "inflight-invalid-worker") + inflightCounter.Inc() // simulate inflight request + inflightTC := GetOrCreateTokenCounter(ctx, "inflight-invalid-worker") + inflightTC.Add(50) + + CleanupInvalidCounters(ctx) + + // Inflight invalid worker counters preserved + _, exists := GetCounter(ctx, "inflight-invalid-worker") + assert.True(t, exists) + _, exists = GetTokenCounter(ctx, "inflight-invalid-worker") + assert.True(t, exists) + assert.Equal(t, uint64(1), inflightCounter.Get()) + assert.Equal(t, uint64(50), inflightTC.Get()) + }) } func TestEstimateTokens(t *testing.T) { @@ -182,19 +212,35 @@ func TestCleanupUnhealthyCounter(t *testing.T) { ctx := context.Background() Init(&config.Config{}, nil) - // Add counters - c := GetOrCreateCounter(ctx, "unhealthy-worker") - c.Inc() - tc := GetOrCreateTokenCounter(ctx, "unhealthy-worker") - tc.Add(100) + t.Run("counter preserved when inflight requests exist", func(t *testing.T) { + c := GetOrCreateCounter(ctx, "unhealthy-worker-inflight") + c.Inc() + tc := GetOrCreateTokenCounter(ctx, "unhealthy-worker-inflight") + tc.Add(100) - CleanupUnhealthyCounter(ctx, "unhealthy-worker") + CleanupUnhealthyCounter(ctx, "unhealthy-worker-inflight") - // Verify cleanup - _, exists := GetCounter(ctx, "unhealthy-worker") - assert.False(t, exists) - _, exists = GetTokenCounter(ctx, "unhealthy-worker") - assert.False(t, exists) + // Counter should be preserved (inflight requests) + _, exists := GetCounter(ctx, "unhealthy-worker-inflight") + assert.True(t, exists) + _, exists = GetTokenCounter(ctx, "unhealthy-worker-inflight") + assert.True(t, exists) + assert.Equal(t, uint64(1), c.Get()) + assert.Equal(t, uint64(100), tc.Get()) + }) + + t.Run("counter deleted when no inflight requests", func(t *testing.T) { + GetOrCreateCounter(ctx, "unhealthy-worker-idle") + GetOrCreateTokenCounter(ctx, "unhealthy-worker-idle") + + CleanupUnhealthyCounter(ctx, "unhealthy-worker-idle") + + // Counter should be deleted (no inflight requests) + _, exists := GetCounter(ctx, "unhealthy-worker-idle") + assert.False(t, exists) + _, exists = GetTokenCounter(ctx, "unhealthy-worker-idle") + assert.False(t, exists) + }) } func TestStartBackupCleanupTask(t *testing.T) { @@ -215,3 +261,110 @@ func TestStartBackupCleanupTask(t *testing.T) { _, exists := GetCounter(ctx, "invalid-worker") assert.False(t, exists) } + +func TestCounterLifecycle_UnhealthyAndReregister(t *testing.T) { + ctx := context.Background() + Init(&config.Config{}, &mockManagerAPI{}) + + url := "http://10.0.0.1:8080" + + // 1. Simulate request arrival: Inc + counter := GetOrCreateCounter(ctx, url) + counter.Inc() + assert.Equal(t, uint64(1), counter.Get()) + + tokenCounter := GetOrCreateTokenCounter(ctx, url) + tokenCounter.Add(100) + assert.Equal(t, uint64(100), tokenCounter.Get()) + + // 2. Instance becomes unhealthy → CleanupUnhealthyCounter (counter preserved due to inflight) + CleanupUnhealthyCounter(ctx, url) + + // Counter still exists, value unchanged + sameCounter := GetOrCreateCounter(ctx, url) + assert.Equal(t, counter, sameCounter) // same object + assert.Equal(t, uint64(1), sameCounter.Get()) + + // 3. Inflight request completes → Release + Release(ctx, url) + assert.Equal(t, uint64(0), counter.Get()) + + ReleasePrefillTokens(ctx, url, "dummy message with 10 chars") + + // 4. Another Release does not underflow + Release(ctx, url) + assert.Equal(t, uint64(0), counter.Get()) // stays 0, no underflow + + // 5. Instance re-registers → new request Inc + counter.Inc() + assert.Equal(t, uint64(1), counter.Get()) + + // 6. Request completes → Release + Release(ctx, url) + assert.Equal(t, uint64(0), counter.Get()) // back to zero + + // 7. Multiple concurrent requests full cycle + counter.Inc() + counter.Inc() + counter.Inc() + assert.Equal(t, uint64(3), counter.Get()) + Release(ctx, url) + Release(ctx, url) + Release(ctx, url) + assert.Equal(t, uint64(0), counter.Get()) // back to zero +} + +func TestCounterLifecycle_CleanupBeforeRelease(t *testing.T) { + ctx := context.Background() + Init(&config.Config{}, &mockManagerAPI{}) + + url := "http://10.0.0.2:8080" + + t.Run("cleanup deletes counter then release is no-op", func(t *testing.T) { + // 1. Request arrives → counter=1 + counter := GetOrCreateCounter(ctx, url) + counter.Inc() + assert.Equal(t, uint64(1), counter.Get()) + + tc := GetOrCreateTokenCounter(ctx, url) + tc.Add(200) + + // 2. Request finishes → Release → counter=0 + Release(ctx, url) + assert.Equal(t, uint64(0), counter.Get()) + + // 3. Cleanup runs, sees counter=0, deletes it + CleanupUnhealthyCounter(ctx, url) + _, exists := GetCounter(ctx, url) + assert.False(t, exists) // counter deleted + + // 4. A late/duplicate Release after cleanup should NOT create ghost counter + Release(ctx, url) + + // Verify no ghost counter was created + _, exists = GetCounter(ctx, url) + assert.False(t, exists, "Release should not create ghost counter after cleanup") + }) + + t.Run("cleanup deletes token counter then ReleasePrefillTokens is no-op", func(t *testing.T) { + Init(&config.Config{}, &mockManagerAPI{}) + tokenURL := "http://10.0.0.3:8080" + + tc := GetOrCreateTokenCounter(ctx, tokenURL) + tc.Add(200) + + // Sub all tokens so counter=0 + tc.Sub(200) + assert.Equal(t, uint64(0), tc.Get()) + + // Cleanup deletes the token counter + CleanupUnhealthyCounter(ctx, tokenURL) + _, exists := GetTokenCounter(ctx, tokenURL) + assert.False(t, exists) + + // Late ReleasePrefillTokens should not create ghost token counter + ReleasePrefillTokens(ctx, tokenURL, "hello world") + _, exists = GetTokenCounter(ctx, tokenURL) + assert.False(t, exists, "ReleasePrefillTokens should not create ghost token counter after cleanup") + }) +}