diff --git a/README.md b/README.md index c2b2d40..62b163f 100644 --- a/README.md +++ b/README.md @@ -4,22 +4,28 @@ Please be advised, NEVER think about using this in production environment. +This repository implemented most features of redis, including 5 data structures, ttl, publish/subscribe and AOF persistence. + +If you could read Chinese, you can find more details in [My Blog](https://www.cnblogs.com/Finley/category/1598973.html). + ## Running You can get runnable program in the releases of this repository, which supports Linux and Darwin system. ```bash ./godis-darwin -``` +``` -The program will try to read `redis.conf` in the working directory. If there is no such file, then the program will run with default config. +You could use redis-cli or other redis client to connect godis server, which listens on 127.0.0.1:6379 on default mode. + +The program will try to read config file path from environment variable `CONFIG`. + +If environment variable is not set, then the program try to read `redis.conf` in the working directory. + +If there is no such file, then the program will run with default config. ## Commands -This repository implemented most of features of redis, including 5 kind of data structures, ttl, publish/subscribe and AOF persistence. - -If you could read Chinese, you can find more details in [My Blog](https://www.cnblogs.com/Finley/category/1598973.html). - Supported Commands: - Keys diff --git a/go.mod b/go.mod index 4a69460..148a03b 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,8 @@ module github.com/HDT3213/godis go 1.12 -require github.com/shopspring/decimal v1.2.0 +require ( + github.com/jolestar/go-commons-pool v2.0.0+incompatible // indirect + github.com/jolestar/go-commons-pool/v2 v2.1.1 + github.com/shopspring/decimal v1.2.0 +) diff --git a/go.sum b/go.sum index 7042e94..96dfe07 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,15 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/jolestar/go-commons-pool v2.0.0+incompatible h1:uHn5uRKsLLQSf9f1J5QPY2xREWx/YH+e4bIIXcAuAaE= +github.com/jolestar/go-commons-pool v2.0.0+incompatible/go.mod h1:ChJYIbIch0DMCSU6VU0t0xhPoWDR2mMFIQek3XWU0s8= +github.com/jolestar/go-commons-pool/v2 v2.1.1 h1:KrbCEvx5KhwcHzLTWIE8SJJQL7zzNto5in+wnO9/gSA= +github.com/jolestar/go-commons-pool/v2 v2.1.1/go.mod h1:kTOzcguO2zUoEd+BySdg7Xhk/YE0HEr2bAHdWDkhMXg= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/node1.conf b/node1.conf new file mode 100644 index 0000000..cb00c26 --- /dev/null +++ b/node1.conf @@ -0,0 +1,9 @@ +bind 0.0.0.0 +port 6399 +maxclients 128 + +appendonly no +appendfilename appendonly.aof + +peers localhost:7379 +self localhost:6399 diff --git a/node2.conf b/node2.conf new file mode 100644 index 0000000..9a5330a --- /dev/null +++ b/node2.conf @@ -0,0 +1,9 @@ +bind 0.0.0.0 +port 7379 +maxclients 128 + +appendonly no +appendfilename appendonly.aof + +peers localhost:6399 +self localhost:7379 diff --git a/redis.conf b/redis.conf index 25a9a5f..6ab6208 100644 --- a/redis.conf +++ b/redis.conf @@ -4,6 +4,3 @@ maxclients 128 appendonly no appendfilename appendonly.aof - -# peers localhost:7379 -self localhost:6399 diff --git a/src/cluster/client.go b/src/cluster/client.go index 9189453..121d55d 100644 --- a/src/cluster/client.go +++ b/src/cluster/client.go @@ -1,307 +1,45 @@ package cluster import ( - "bufio" "context" "errors" - "github.com/HDT3213/godis/src/cluster/idgenerator" - "github.com/HDT3213/godis/src/interface/redis" - "github.com/HDT3213/godis/src/lib/logger" - "github.com/HDT3213/godis/src/lib/sync/wait" - "github.com/HDT3213/godis/src/redis/reply" - "io" - "net" - "strconv" - "sync" - "time" + "github.com/HDT3213/godis/src/redis/client" + "github.com/jolestar/go-commons-pool/v2" ) -const ( - timeout = 2 * time.Second - CRLF = "\r\n" -) - -type InternalClient struct { - idGen *idgenerator.IdGenerator - conn net.Conn - sendingReqs chan *AsyncRequest - ticker *time.Ticker - addr string - waitingMap *sync.Map // key -> request - - ctx context.Context - cancelFunc context.CancelFunc - writing *sync.WaitGroup +type ConnectionFactory struct { + Peer string } -type AsyncRequest struct { - id int64 - args [][]byte - reply redis.Reply - heartbeat bool - waiting *wait.Wait -} - -type AsyncMultiBulkReply struct { - Args [][]byte -} - -func MakeAsyncMultiBulkReply(args [][]byte) *AsyncMultiBulkReply { - return &AsyncMultiBulkReply{ - Args: args, - } -} - -func (r *AsyncMultiBulkReply) ToBytes() []byte { - argLen := len(r.Args) - res := "@" + strconv.Itoa(argLen) + CRLF - for _, arg := range r.Args { - if arg == nil { - res += "$-1" + CRLF - } else { - res += "$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF - } - } - return []byte(res) -} - -func MakeInternalClient(addr string, idGen *idgenerator.IdGenerator) (*InternalClient, error) { - conn, err := net.Dial("tcp", addr) +func (f *ConnectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) { + c, err := client.MakeClient(f.Peer) if err != nil { return nil, err } - ctx, cancel := context.WithCancel(context.Background()) - return &InternalClient{ - addr: addr, - conn: conn, - sendingReqs: make(chan *AsyncRequest, 256), - waitingMap: &sync.Map{}, - - ctx: ctx, - cancelFunc: cancel, - writing: &sync.WaitGroup{}, - idGen: idGen, - }, nil + c.Start() + return pool.NewPooledObject(c), nil } -func (client *InternalClient) Start() { - client.ticker = time.NewTicker(10 * time.Second) - go client.handleWrite() - go func() { - err := client.handleRead() - logger.Warn(err) - }() - go client.heartbeat() -} - -func (client *InternalClient) Close() { - // send stop signal - client.cancelFunc() - - // wait stop process - client.writing.Wait() - - // clean - _ = client.conn.Close() - close(client.sendingReqs) -} - -func (client *InternalClient) handleConnectionError(err error) error { - err1 := client.conn.Close() - if err1 != nil { - if opErr, ok := err1.(*net.OpError); ok { - if opErr.Err.Error() != "use of closed network connection" { - return err1 - } - } else { - return err1 - } +func (f *ConnectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error { + c, ok := object.Object.(*client.Client) + if !ok { + return errors.New("type mismatch") } - conn, err1 := net.Dial("tcp", client.addr) - if err1 != nil { - logger.Error(err1) - return err1 - } - client.conn = conn - go func() { - _ = client.handleRead() - }() + c.Close() return nil } -func (client *InternalClient) heartbeat() { -loop: - for { - select { - case <-client.ticker.C: - client.sendingReqs <- &AsyncRequest{ - args: [][]byte{[]byte("PING")}, - heartbeat: true, - } - case <-client.ctx.Done(): - break loop - } - } +func (f *ConnectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool { + // do validate + return true } -func (client *InternalClient) handleWrite() { - client.writing.Add(1) -loop: - for { - select { - case req := <-client.sendingReqs: - client.doRequest(req) - case <-client.ctx.Done(): - break loop - } - } - client.writing.Done() +func (f *ConnectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error { + // do activate + return nil } -func (client *InternalClient) Send(args [][]byte) redis.Reply { - request := &AsyncRequest{ - id: client.idGen.NextId(), - args: args, - heartbeat: false, - waiting: &wait.Wait{}, - } - request.waiting.Add(1) - client.sendingReqs <- request - client.waitingMap.Store(request.id, request) - timeUp := request.waiting.WaitWithTimeout(timeout) - if timeUp { - client.waitingMap.Delete(request.id) - return nil - } else { - return request.reply - } -} - -func (client *InternalClient) doRequest(req *AsyncRequest) { - bytes := reply.MakeMultiBulkReply(req.args).ToBytes() - _, err := client.conn.Write(bytes) - i := 0 - for err != nil && i < 3 { - err = client.handleConnectionError(err) - if err == nil { - _, err = client.conn.Write(bytes) - } - i++ - } -} - -func (client *InternalClient) finishRequest(reply *AsyncMultiBulkReply) { - if reply == nil || reply.Args == nil || len(reply.Args) == 0 { - return - } - reqId, err := strconv.ParseInt(string(reply.Args[0]), 10, 64) - if err != nil { - logger.Warn(err) - return - } - raw, ok := client.waitingMap.Load(reqId) - if !ok { - return - } - request := raw.(*AsyncRequest) - request.reply = reply - if request.waiting != nil { - request.waiting.Done() - } -} - -func (client *InternalClient) handleRead() error { - reader := bufio.NewReader(client.conn) - downloading := false - expectedArgsCount := 0 - receivedCount := 0 - var args [][]byte - var fixedLen int64 = 0 - var err error - var msg []byte - for { - // read line - if fixedLen == 0 { // read normal line - msg, err = reader.ReadBytes('\n') - if err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF { - logger.Info("connection close") - } else { - logger.Warn(err) - } - - return errors.New("connection closed") - } - if len(msg) == 0 || msg[len(msg)-2] != '\r' { - return errors.New("protocol error") - } - } else { // read bulk line (binary safe) - msg = make([]byte, fixedLen+2) - _, err = io.ReadFull(reader, msg) - if err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF { - return errors.New("connection closed") - } else { - return err - } - } - if len(msg) == 0 || - msg[len(msg)-2] != '\r' || - msg[len(msg)-1] != '\n' { - return errors.New("protocol error") - } - fixedLen = 0 - } - - // parse line - if !downloading { - // receive new response - if msg[0] == '@' { // customized multi bulk response - // bulk multi msg - expectedLine, err := strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32) - if err != nil { - return errors.New("protocol error: " + err.Error()) - } - if expectedLine == 0 { - client.finishRequest(nil) - } else if expectedLine > 0 { - downloading = true - expectedArgsCount = int(expectedLine) - receivedCount = 0 - args = make([][]byte, expectedLine) - } else { - return errors.New("protocol error") - } - } - } else { - // receive following part of a request - line := msg[0 : len(msg)-2] - if line[0] == '$' { - fixedLen, err = strconv.ParseInt(string(line[1:]), 10, 64) - if err != nil { - return err - } - if fixedLen <= 0 { // null bulk in multi bulks - args[receivedCount] = []byte{} - receivedCount++ - fixedLen = 0 - } - } else { - args[receivedCount] = line - receivedCount++ - } - - // if sending finished - if receivedCount == expectedArgsCount { - downloading = false // finish downloading progress - - client.finishRequest(&AsyncMultiBulkReply{Args: args}) - - // finish reply - expectedArgsCount = 0 - receivedCount = 0 - args = nil - } - } - } +func (f *ConnectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error { + // do passivate + return nil } diff --git a/src/cluster/cluster.go b/src/cluster/cluster.go index 9ec54e8..e9264c9 100644 --- a/src/cluster/cluster.go +++ b/src/cluster/cluster.go @@ -1,6 +1,8 @@ package cluster import ( + "context" + "errors" "fmt" "github.com/HDT3213/godis/src/cluster/idgenerator" "github.com/HDT3213/godis/src/config" @@ -11,6 +13,7 @@ import ( "github.com/HDT3213/godis/src/lib/logger" "github.com/HDT3213/godis/src/redis/client" "github.com/HDT3213/godis/src/redis/reply" + "github.com/jolestar/go-commons-pool/v2" "runtime/debug" "strings" ) @@ -18,8 +21,8 @@ import ( type Cluster struct { self string - peerPicker *consistenthash.Map - peers map[string]*client.Client + peerPicker *consistenthash.Map + peerConnection map[string]*pool.ObjectPool db *db.DB transactions *dict.SimpleDict // id -> Transaction @@ -36,10 +39,10 @@ func MakeCluster() *Cluster { cluster := &Cluster{ self: config.Properties.Self, - db: db.MakeDB(), - transactions: dict.MakeSimple(), - peerPicker: consistenthash.New(replicas, nil), - peers: make(map[string]*client.Client), + db: db.MakeDB(), + transactions: dict.MakeSimple(), + peerPicker: consistenthash.New(replicas, nil), + peerConnection: make(map[string]*pool.ObjectPool), idGenerator: idgenerator.MakeGenerator("godis", config.Properties.Self), } @@ -55,6 +58,12 @@ func MakeCluster() *Cluster { } peers = append(peers, config.Properties.Self) cluster.peerPicker.Add(peers...) + ctx := context.Background() + for _, peer := range peers { + cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &ConnectionFactory{ + Peer: peer, + }) + } } return cluster } @@ -90,18 +99,27 @@ func (cluster *Cluster) AfterClientClose(c redis.Connection) { } func (cluster *Cluster) getPeerClient(peer string) (*client.Client, error) { - peerClient, ok := cluster.peers[peer] - // lazy init + connectionFactory, ok := cluster.peerConnection[peer] if !ok { - var err error - peerClient, err = client.MakeClient(peer) - if err != nil { - return nil, err - } - peerClient.Start() - cluster.peers[peer] = peerClient + return nil, errors.New("connection factory not found") } - return peerClient, nil + raw, err := connectionFactory.BorrowObject(context.Background()) + if err != nil { + return nil, err + } + conn, ok := raw.(*client.Client) + if !ok { + return nil, errors.New("connection factory make wrong type") + } + return conn, nil +} + +func (cluster *Cluster) returnPeerClient(peer string, peerClient *client.Client) error { + connectionFactory, ok := cluster.peerConnection[peer] + if !ok { + return errors.New("connection factory not found") + } + return connectionFactory.ReturnObject(context.Background(), peerClient) } func Ping(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { @@ -125,6 +143,9 @@ func (cluster *Cluster) Relay(peer string, c redis.Connection, args [][]byte) re if err != nil { return reply.MakeErrReply(err.Error()) } + defer func() { + _ = cluster.returnPeerClient(peer, peerClient) + }() return peerClient.Send(args) } } diff --git a/src/cluster/mset.go b/src/cluster/mset.go index 736ba5c..73fbaf0 100644 --- a/src/cluster/mset.go +++ b/src/cluster/mset.go @@ -10,7 +10,7 @@ import ( func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) < 2 { - return reply.MakeErrReply("ERR wrong number of arguments for 'del' command") + return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command") } keys := make([]string, len(args)-1) for i := 1; i < len(args); i++ { diff --git a/src/cmd/main.go b/src/cmd/main.go index 534d6be..e1ad635 100644 --- a/src/cmd/main.go +++ b/src/cmd/main.go @@ -6,10 +6,15 @@ import ( "github.com/HDT3213/godis/src/lib/logger" RedisServer "github.com/HDT3213/godis/src/redis/server" "github.com/HDT3213/godis/src/tcp" + "os" ) func main() { - config.SetupConfig("redis.conf") + configFilename := os.Getenv("CONFIG") + if configFilename == "" { + configFilename = "redis.conf" + } + config.SetupConfig(configFilename) logger.Setup(&logger.Settings{ Path: "logs", Name: "godis",