diff --git a/CHANGELOG.md b/CHANGELOG.md index ab4da63..b9e2ef5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # [rpcx](http://rpcx.io) ## 1.8.0 +- supports distributed rate limiter based on go-redis/redis-rate ## 1.7.0 - move etcd support to github.com/rpcxio/rpcx-etcd diff --git a/README.md b/README.md index 05a7e5c..d5a3639 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -- **stable branch**: v1.6.x +- **stable branch**: v1.7.x - **development branch**: master diff --git a/TODO.md b/TODO.md deleted file mode 100644 index 8d5748d..0000000 --- a/TODO.md +++ /dev/null @@ -1,22 +0,0 @@ -# rpcx - -Milestone 6.0 - -### Todo - -- [ ] improve message.Encode to avoid copy header and payload [#399] -- [ ] Users can customize the failure of calls [#396] -- [ ] improve rpcx-gateway 6.0 feature [#390] -- [ ] improve rust lib [#389] -- [ ] add rpc request replay [#417] -- [ ] support sub/pub [#415] -- [ ] avoid too many connects after disconnected [#411] -- [ ] re-register services if registery center has recovered [#420] - -### In Progress - - -### Done ✓ - -- [x] can't get latest nodes for nacos [#422] -- [x] add k8s deployment yaml [#355] \ No newline at end of file diff --git a/clientplugin/req_rate_limiting_redis.go b/clientplugin/req_rate_limiting_redis.go new file mode 100644 index 0000000..7a6d1c5 --- /dev/null +++ b/clientplugin/req_rate_limiting_redis.go @@ -0,0 +1,53 @@ +package clientplugin + +import ( + "context" + "time" + + "github.com/go-redis/redis/v8" + "github.com/go-redis/redis_rate/v9" + "github.com/smallnest/rpcx/client" + "github.com/smallnest/rpcx/server" +) + +var _ client.PreCallPlugin = (*RedisRateLimitingPlugin)(nil) + +// RedisRateLimitingPlugin can limit requests per unit time +type RedisRateLimitingPlugin struct { + addrs []string + limiter redis_rate.Limiter + limit redis_rate.Limit +} + +// NewRedisRateLimitingPlugin creates a new RateLimitingPlugin +func NewRedisRateLimitingPlugin(addrs []string, rate int, burst int, period time.Duration) *RedisRateLimitingPlugin { + limit := redis_rate.Limit{ + Rate: rate, + Burst: burst, + Period: period, + } + rdb := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addrs, + }) + + limiter := redis_rate.NewLimiter(rdb) + + return &RedisRateLimitingPlugin{ + addrs: addrs, + limiter: *limiter, + limit: limit, + } +} + +// PreReadRequest can limit request processing. +func (plugin *RedisRateLimitingPlugin) PreCall(ctx context.Context, servicePath, serviceMethod string, args interface{}) error { + res, err := plugin.limiter.Allow(ctx, servicePath+"/"+serviceMethod, plugin.limit) + if err != nil { + return err + } + + if res.Allowed > 0 { + return nil + } + return server.ErrReqReachLimit +} diff --git a/go.mod b/go.mod index b386287..7f38f30 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 github.com/go-redis/redis/v8 v8.11.4 // indirect + github.com/go-redis/redis_rate/v9 v9.1.2 // indirect github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.2 github.com/golang/snappy v0.0.4 diff --git a/go.sum b/go.sum index 266e194..09834cc 100644 --- a/go.sum +++ b/go.sum @@ -94,6 +94,8 @@ github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534/go.mod h1:xIFjORFzTxq github.com/go-redis/redis/v8 v8.8.2/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqWMnCV1iP5Y= github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg= github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= +github.com/go-redis/redis_rate/v9 v9.1.2 h1:H0l5VzoAtOE6ydd38j8MCq3ABlGLnvvbA1xDSVVCHgQ= +github.com/go-redis/redis_rate/v9 v9.1.2/go.mod h1:oam2de2apSgRG8aJzwJddXbNu91Iyz1m8IKJE2vpvlQ= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= diff --git a/serverplugin/req_rate_limiting_redis.go b/serverplugin/req_rate_limiting_redis.go new file mode 100644 index 0000000..c8f67d0 --- /dev/null +++ b/serverplugin/req_rate_limiting_redis.go @@ -0,0 +1,53 @@ +package serverplugin + +import ( + "context" + "time" + + "github.com/go-redis/redis/v8" + "github.com/go-redis/redis_rate/v9" + "github.com/smallnest/rpcx/protocol" + "github.com/smallnest/rpcx/server" +) + +var _ server.PostReadRequestPlugin = (*RedisRateLimitingPlugin)(nil) + +// RedisRateLimitingPlugin can limit requests per unit time +type RedisRateLimitingPlugin struct { + addrs []string + limiter redis_rate.Limiter + limit redis_rate.Limit +} + +// NewRedisRateLimitingPlugin creates a new RateLimitingPlugin +func NewRedisRateLimitingPlugin(addrs []string, rate int, burst int, period time.Duration) *RedisRateLimitingPlugin { + limit := redis_rate.Limit{ + Rate: rate, + Burst: burst, + Period: period, + } + rdb := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addrs, + }) + + limiter := redis_rate.NewLimiter(rdb) + + return &RedisRateLimitingPlugin{ + addrs: addrs, + limiter: *limiter, + limit: limit, + } +} + +// PreReadRequest can limit request processing. +func (plugin *RedisRateLimitingPlugin) PostReadRequest(ctx context.Context, r *protocol.Message, e error) error { + res, err := plugin.limiter.Allow(ctx, r.ServicePath+"/"+r.ServiceMethod, plugin.limit) + if err != nil { + return err + } + + if res.Allowed > 0 { + return nil + } + return server.ErrReqReachLimit +}