use connection pool within peers in cluster mode

This commit is contained in:
hdt3213 2020-10-31 23:14:52 +08:00
parent 82715bd8ea
commit 57c190dca5
10 changed files with 114 additions and 312 deletions

View File

@ -4,22 +4,28 @@
Please be advised, NEVER think about using this in production environment.
This repository implemented most features of redis, including 5 data structures, ttl, publish/subscribe and AOF persistence.
If you could read Chinese, you can find more details in [My Blog](https://www.cnblogs.com/Finley/category/1598973.html).
## Running
You can get runnable program in the releases of this repository, which supports Linux and Darwin system.
```bash
./godis-darwin
```
```
The program will try to read `redis.conf` in the working directory. If there is no such file, then the program will run with default config.
You could use redis-cli or other redis client to connect godis server, which listens on 127.0.0.1:6379 on default mode.
The program will try to read config file path from environment variable `CONFIG`.
If environment variable is not set, then the program try to read `redis.conf` in the working directory.
If there is no such file, then the program will run with default config.
## Commands
This repository implemented most of features of redis, including 5 kind of data structures, ttl, publish/subscribe and AOF persistence.
If you could read Chinese, you can find more details in [My Blog](https://www.cnblogs.com/Finley/category/1598973.html).
Supported Commands:
- Keys

6
go.mod
View File

@ -2,4 +2,8 @@ module github.com/HDT3213/godis
go 1.12
require github.com/shopspring/decimal v1.2.0
require (
github.com/jolestar/go-commons-pool v2.0.0+incompatible // indirect
github.com/jolestar/go-commons-pool/v2 v2.1.1
github.com/shopspring/decimal v1.2.0
)

13
go.sum
View File

@ -1,2 +1,15 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/jolestar/go-commons-pool v2.0.0+incompatible h1:uHn5uRKsLLQSf9f1J5QPY2xREWx/YH+e4bIIXcAuAaE=
github.com/jolestar/go-commons-pool v2.0.0+incompatible/go.mod h1:ChJYIbIch0DMCSU6VU0t0xhPoWDR2mMFIQek3XWU0s8=
github.com/jolestar/go-commons-pool/v2 v2.1.1 h1:KrbCEvx5KhwcHzLTWIE8SJJQL7zzNto5in+wnO9/gSA=
github.com/jolestar/go-commons-pool/v2 v2.1.1/go.mod h1:kTOzcguO2zUoEd+BySdg7Xhk/YE0HEr2bAHdWDkhMXg=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

9
node1.conf Normal file
View File

@ -0,0 +1,9 @@
bind 0.0.0.0
port 6399
maxclients 128
appendonly no
appendfilename appendonly.aof
peers localhost:7379
self localhost:6399

9
node2.conf Normal file
View File

@ -0,0 +1,9 @@
bind 0.0.0.0
port 7379
maxclients 128
appendonly no
appendfilename appendonly.aof
peers localhost:6399
self localhost:7379

View File

@ -4,6 +4,3 @@ maxclients 128
appendonly no
appendfilename appendonly.aof
# peers localhost:7379
self localhost:6399

View File

@ -1,307 +1,45 @@
package cluster
import (
"bufio"
"context"
"errors"
"github.com/HDT3213/godis/src/cluster/idgenerator"
"github.com/HDT3213/godis/src/interface/redis"
"github.com/HDT3213/godis/src/lib/logger"
"github.com/HDT3213/godis/src/lib/sync/wait"
"github.com/HDT3213/godis/src/redis/reply"
"io"
"net"
"strconv"
"sync"
"time"
"github.com/HDT3213/godis/src/redis/client"
"github.com/jolestar/go-commons-pool/v2"
)
const (
timeout = 2 * time.Second
CRLF = "\r\n"
)
type InternalClient struct {
idGen *idgenerator.IdGenerator
conn net.Conn
sendingReqs chan *AsyncRequest
ticker *time.Ticker
addr string
waitingMap *sync.Map // key -> request
ctx context.Context
cancelFunc context.CancelFunc
writing *sync.WaitGroup
type ConnectionFactory struct {
Peer string
}
type AsyncRequest struct {
id int64
args [][]byte
reply redis.Reply
heartbeat bool
waiting *wait.Wait
}
type AsyncMultiBulkReply struct {
Args [][]byte
}
func MakeAsyncMultiBulkReply(args [][]byte) *AsyncMultiBulkReply {
return &AsyncMultiBulkReply{
Args: args,
}
}
func (r *AsyncMultiBulkReply) ToBytes() []byte {
argLen := len(r.Args)
res := "@" + strconv.Itoa(argLen) + CRLF
for _, arg := range r.Args {
if arg == nil {
res += "$-1" + CRLF
} else {
res += "$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF
}
}
return []byte(res)
}
func MakeInternalClient(addr string, idGen *idgenerator.IdGenerator) (*InternalClient, error) {
conn, err := net.Dial("tcp", addr)
func (f *ConnectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
c, err := client.MakeClient(f.Peer)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
return &InternalClient{
addr: addr,
conn: conn,
sendingReqs: make(chan *AsyncRequest, 256),
waitingMap: &sync.Map{},
ctx: ctx,
cancelFunc: cancel,
writing: &sync.WaitGroup{},
idGen: idGen,
}, nil
c.Start()
return pool.NewPooledObject(c), nil
}
func (client *InternalClient) Start() {
client.ticker = time.NewTicker(10 * time.Second)
go client.handleWrite()
go func() {
err := client.handleRead()
logger.Warn(err)
}()
go client.heartbeat()
}
func (client *InternalClient) Close() {
// send stop signal
client.cancelFunc()
// wait stop process
client.writing.Wait()
// clean
_ = client.conn.Close()
close(client.sendingReqs)
}
func (client *InternalClient) handleConnectionError(err error) error {
err1 := client.conn.Close()
if err1 != nil {
if opErr, ok := err1.(*net.OpError); ok {
if opErr.Err.Error() != "use of closed network connection" {
return err1
}
} else {
return err1
}
func (f *ConnectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
c, ok := object.Object.(*client.Client)
if !ok {
return errors.New("type mismatch")
}
conn, err1 := net.Dial("tcp", client.addr)
if err1 != nil {
logger.Error(err1)
return err1
}
client.conn = conn
go func() {
_ = client.handleRead()
}()
c.Close()
return nil
}
func (client *InternalClient) heartbeat() {
loop:
for {
select {
case <-client.ticker.C:
client.sendingReqs <- &AsyncRequest{
args: [][]byte{[]byte("PING")},
heartbeat: true,
}
case <-client.ctx.Done():
break loop
}
}
func (f *ConnectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {
// do validate
return true
}
func (client *InternalClient) handleWrite() {
client.writing.Add(1)
loop:
for {
select {
case req := <-client.sendingReqs:
client.doRequest(req)
case <-client.ctx.Done():
break loop
}
}
client.writing.Done()
func (f *ConnectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {
// do activate
return nil
}
func (client *InternalClient) Send(args [][]byte) redis.Reply {
request := &AsyncRequest{
id: client.idGen.NextId(),
args: args,
heartbeat: false,
waiting: &wait.Wait{},
}
request.waiting.Add(1)
client.sendingReqs <- request
client.waitingMap.Store(request.id, request)
timeUp := request.waiting.WaitWithTimeout(timeout)
if timeUp {
client.waitingMap.Delete(request.id)
return nil
} else {
return request.reply
}
}
func (client *InternalClient) doRequest(req *AsyncRequest) {
bytes := reply.MakeMultiBulkReply(req.args).ToBytes()
_, err := client.conn.Write(bytes)
i := 0
for err != nil && i < 3 {
err = client.handleConnectionError(err)
if err == nil {
_, err = client.conn.Write(bytes)
}
i++
}
}
func (client *InternalClient) finishRequest(reply *AsyncMultiBulkReply) {
if reply == nil || reply.Args == nil || len(reply.Args) == 0 {
return
}
reqId, err := strconv.ParseInt(string(reply.Args[0]), 10, 64)
if err != nil {
logger.Warn(err)
return
}
raw, ok := client.waitingMap.Load(reqId)
if !ok {
return
}
request := raw.(*AsyncRequest)
request.reply = reply
if request.waiting != nil {
request.waiting.Done()
}
}
func (client *InternalClient) handleRead() error {
reader := bufio.NewReader(client.conn)
downloading := false
expectedArgsCount := 0
receivedCount := 0
var args [][]byte
var fixedLen int64 = 0
var err error
var msg []byte
for {
// read line
if fixedLen == 0 { // read normal line
msg, err = reader.ReadBytes('\n')
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
logger.Info("connection close")
} else {
logger.Warn(err)
}
return errors.New("connection closed")
}
if len(msg) == 0 || msg[len(msg)-2] != '\r' {
return errors.New("protocol error")
}
} else { // read bulk line (binary safe)
msg = make([]byte, fixedLen+2)
_, err = io.ReadFull(reader, msg)
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
return errors.New("connection closed")
} else {
return err
}
}
if len(msg) == 0 ||
msg[len(msg)-2] != '\r' ||
msg[len(msg)-1] != '\n' {
return errors.New("protocol error")
}
fixedLen = 0
}
// parse line
if !downloading {
// receive new response
if msg[0] == '@' { // customized multi bulk response
// bulk multi msg
expectedLine, err := strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
if err != nil {
return errors.New("protocol error: " + err.Error())
}
if expectedLine == 0 {
client.finishRequest(nil)
} else if expectedLine > 0 {
downloading = true
expectedArgsCount = int(expectedLine)
receivedCount = 0
args = make([][]byte, expectedLine)
} else {
return errors.New("protocol error")
}
}
} else {
// receive following part of a request
line := msg[0 : len(msg)-2]
if line[0] == '$' {
fixedLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return err
}
if fixedLen <= 0 { // null bulk in multi bulks
args[receivedCount] = []byte{}
receivedCount++
fixedLen = 0
}
} else {
args[receivedCount] = line
receivedCount++
}
// if sending finished
if receivedCount == expectedArgsCount {
downloading = false // finish downloading progress
client.finishRequest(&AsyncMultiBulkReply{Args: args})
// finish reply
expectedArgsCount = 0
receivedCount = 0
args = nil
}
}
}
func (f *ConnectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {
// do passivate
return nil
}

View File

@ -1,6 +1,8 @@
package cluster
import (
"context"
"errors"
"fmt"
"github.com/HDT3213/godis/src/cluster/idgenerator"
"github.com/HDT3213/godis/src/config"
@ -11,6 +13,7 @@ import (
"github.com/HDT3213/godis/src/lib/logger"
"github.com/HDT3213/godis/src/redis/client"
"github.com/HDT3213/godis/src/redis/reply"
"github.com/jolestar/go-commons-pool/v2"
"runtime/debug"
"strings"
)
@ -18,8 +21,8 @@ import (
type Cluster struct {
self string
peerPicker *consistenthash.Map
peers map[string]*client.Client
peerPicker *consistenthash.Map
peerConnection map[string]*pool.ObjectPool
db *db.DB
transactions *dict.SimpleDict // id -> Transaction
@ -36,10 +39,10 @@ func MakeCluster() *Cluster {
cluster := &Cluster{
self: config.Properties.Self,
db: db.MakeDB(),
transactions: dict.MakeSimple(),
peerPicker: consistenthash.New(replicas, nil),
peers: make(map[string]*client.Client),
db: db.MakeDB(),
transactions: dict.MakeSimple(),
peerPicker: consistenthash.New(replicas, nil),
peerConnection: make(map[string]*pool.ObjectPool),
idGenerator: idgenerator.MakeGenerator("godis", config.Properties.Self),
}
@ -55,6 +58,12 @@ func MakeCluster() *Cluster {
}
peers = append(peers, config.Properties.Self)
cluster.peerPicker.Add(peers...)
ctx := context.Background()
for _, peer := range peers {
cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &ConnectionFactory{
Peer: peer,
})
}
}
return cluster
}
@ -90,18 +99,27 @@ func (cluster *Cluster) AfterClientClose(c redis.Connection) {
}
func (cluster *Cluster) getPeerClient(peer string) (*client.Client, error) {
peerClient, ok := cluster.peers[peer]
// lazy init
connectionFactory, ok := cluster.peerConnection[peer]
if !ok {
var err error
peerClient, err = client.MakeClient(peer)
if err != nil {
return nil, err
}
peerClient.Start()
cluster.peers[peer] = peerClient
return nil, errors.New("connection factory not found")
}
return peerClient, nil
raw, err := connectionFactory.BorrowObject(context.Background())
if err != nil {
return nil, err
}
conn, ok := raw.(*client.Client)
if !ok {
return nil, errors.New("connection factory make wrong type")
}
return conn, nil
}
func (cluster *Cluster) returnPeerClient(peer string, peerClient *client.Client) error {
connectionFactory, ok := cluster.peerConnection[peer]
if !ok {
return errors.New("connection factory not found")
}
return connectionFactory.ReturnObject(context.Background(), peerClient)
}
func Ping(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
@ -125,6 +143,9 @@ func (cluster *Cluster) Relay(peer string, c redis.Connection, args [][]byte) re
if err != nil {
return reply.MakeErrReply(err.Error())
}
defer func() {
_ = cluster.returnPeerClient(peer, peerClient)
}()
return peerClient.Send(args)
}
}

View File

@ -10,7 +10,7 @@ import (
func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) < 2 {
return reply.MakeErrReply("ERR wrong number of arguments for 'del' command")
return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command")
}
keys := make([]string, len(args)-1)
for i := 1; i < len(args); i++ {

View File

@ -6,10 +6,15 @@ import (
"github.com/HDT3213/godis/src/lib/logger"
RedisServer "github.com/HDT3213/godis/src/redis/server"
"github.com/HDT3213/godis/src/tcp"
"os"
)
func main() {
config.SetupConfig("redis.conf")
configFilename := os.Getenv("CONFIG")
if configFilename == "" {
configFilename = "redis.conf"
}
config.SetupConfig(configFilename)
logger.Setup(&logger.Settings{
Path: "logs",
Name: "godis",