From c101b189697e88627a52e3e25646785c8a1a90b2 Mon Sep 17 00:00:00 2001 From: Kelvin Clement Mwinuka Date: Sat, 12 Aug 2023 05:40:35 +0800 Subject: [PATCH] Created ApplyRequest and ApplyResponse types to handle calls and responses to raft.Apply. In-Memory state between nodes is now synchronized. --- docker-compose.yaml | 98 +++++++++++++++++++++---------------------- server/main.go | 54 ++++++++++++++++-------- server/raft.go | 45 +++++++++++++++++++- server/utils/types.go | 10 +++++ 4 files changed, 139 insertions(+), 68 deletions(-) create mode 100644 server/utils/types.go diff --git a/docker-compose.yaml b/docker-compose.yaml index 7660e50..38322d2 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -19,7 +19,7 @@ services: - SERVERID=1 - PLUGINDIR=/usr/local/lib/memstore - INMEMORY=true - - TLS=true + - TLS=false - HTTP=false ports: - "7480:7480" @@ -43,7 +43,7 @@ services: - JOINADDR=node1:7946 - PLUGINDIR=/usr/local/lib/memstore - INMEMORY=true - - TLS=true + - TLS=false - HTTP=false ports: - "7481:7480" @@ -67,7 +67,7 @@ services: - JOINADDR=node1:7946 - PLUGINDIR=/usr/local/lib/memstore - INMEMORY=true - - TLS=true + - TLS=false - HTTP=false ports: - "7482:7480" @@ -76,50 +76,50 @@ services: networks: - testnet - node4: - container_name: node4 - build: - context: . - dockerfile: ./server/Dockerfile - environment: - - PORT=7480 - - RAFTPORT=8000 - - MLPORT=7946 - - KEY=/etc/ssl/certs/memstore/server.key - - CERT=/etc/ssl/certs/memstore/server.crt - - SERVERID=4 - - JOINADDR=node1:7946 - - PLUGINDIR=/usr/local/lib/memstore - - INMEMORY=true - - TLS=true - - HTTP=false - ports: - - "7483:7480" - - "7949:7946" - - "8003:8000" - networks: - - testnet + # node4: + # container_name: node4 + # build: + # context: . + # dockerfile: ./server/Dockerfile + # environment: + # - PORT=7480 + # - RAFTPORT=8000 + # - MLPORT=7946 + # - KEY=/etc/ssl/certs/memstore/server.key + # - CERT=/etc/ssl/certs/memstore/server.crt + # - SERVERID=4 + # - JOINADDR=node1:7946 + # - PLUGINDIR=/usr/local/lib/memstore + # - INMEMORY=true + # - TLS=false + # - HTTP=false + # ports: + # - "7483:7480" + # - "7949:7946" + # - "8003:8000" + # networks: + # - testnet - node5: - container_name: node5 - build: - context: . - dockerfile: ./server/Dockerfile - environment: - - PORT=7480 - - RAFTPORT=8000 - - MLPORT=7946 - - KEY=/etc/ssl/certs/memstore/server.key - - CERT=/etc/ssl/certs/memstore/server.crt - - SERVERID=5 - - JOINADDR=node1:7946 - - PLUGINDIR=/usr/local/lib/memstore - - INMEMORY=true - - TLS=true - - HTTP=false - ports: - - "7484:7480" - - "7950:7946" - - "8004:8000" - networks: - - testnet \ No newline at end of file + # node5: + # container_name: node5 + # build: + # context: . + # dockerfile: ./server/Dockerfile + # environment: + # - PORT=7480 + # - RAFTPORT=8000 + # - MLPORT=7946 + # - KEY=/etc/ssl/certs/memstore/server.key + # - CERT=/etc/ssl/certs/memstore/server.crt + # - SERVERID=5 + # - JOINADDR=node1:7946 + # - PLUGINDIR=/usr/local/lib/memstore + # - INMEMORY=true + # - TLS=false + # - HTTP=false + # ports: + # - "7484:7480" + # - "7950:7946" + # - "8004:8000" + # networks: + # - testnet \ No newline at end of file diff --git a/server/main.go b/server/main.go index 4272cb9..8ee0f7e 100644 --- a/server/main.go +++ b/server/main.go @@ -3,6 +3,7 @@ package main import ( "bufio" "crypto/tls" + "encoding/json" "fmt" "io" "log" @@ -15,6 +16,7 @@ import ( "strings" "sync" "syscall" + "time" "github.com/hashicorp/memberlist" "github.com/hashicorp/raft" @@ -86,27 +88,43 @@ func (server *Server) handleConnection(conn net.Conn) { connRW.Flush() continue } else { - // Look for plugin that handles this command and trigger it - handled := false + applyRequest := utils.ApplyRequest{CMD: cmd} + b, err := json.Marshal(applyRequest) - for _, plugin := range server.plugins { - if utils.Contains[string](plugin.Commands(), strings.ToLower(cmd[0])) { - res, err := plugin.HandleCommand(cmd, server) - - if err != nil { - connRW.Write([]byte(fmt.Sprintf("-Error %s\r\n\n", err.Error()))) - } else { - connRW.Write(res) - } - - connRW.Flush() - - handled = true - } + if err != nil { + connRW.Write([]byte("-Error could not parse request\r\n\n")) + connRW.Flush() + continue } - if !handled { - connRW.Write([]byte(fmt.Sprintf("-Error %s command not supported\r\n\n", strings.ToUpper(cmd[0])))) + if server.isRaftLeader() { + applyFuture := server.raft.Apply(b, 500*time.Millisecond) + + if err := applyFuture.Error(); err != nil { + connRW.WriteString(fmt.Sprintf("-Error %s\r\n\n", err.Error())) + connRW.Flush() + continue + } + + r, ok := applyFuture.Response().(utils.ApplyResponse) + + if !ok { + connRW.WriteString(fmt.Sprintf("-Error unprocessable entity %v\r\n\n", r)) + connRW.Flush() + continue + } + + if r.Error != nil { + connRW.WriteString(fmt.Sprintf("-Error %s\r\n\n", r.Error.Error())) + connRW.Flush() + continue + } + + connRW.Write(r.Response) + connRW.Flush() + } else { + // Not Raft leader, forward message to leader and wait for a response + connRW.Write([]byte("-Error not cluster leader, cannot carry out command\r\n\n")) connRW.Flush() } } diff --git a/server/raft.go b/server/raft.go index 0150623..7885c5b 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "errors" "fmt" "io" @@ -8,10 +9,12 @@ import ( "net" "os" "path/filepath" + "strings" "time" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" + "github.com/kelvinmwinuka/memstore/server/utils" ) func (server *Server) RaftInit() { @@ -69,7 +72,7 @@ func (server *Server) RaftInit() { // Start raft server raftServer, err := raft.NewRaft( raftConfig, - &raft.MockFSM{}, + raft.FSM(server), logStore, stableStore, snapshotStore, @@ -101,16 +104,56 @@ func (server *Server) RaftInit() { // Implement raft.FSM interface func (server *Server) Apply(log *raft.Log) interface{} { + switch log.Type { + case raft.LogCommand: + var request utils.ApplyRequest + + err := json.Unmarshal(log.Data, &request) + if err != nil { + return utils.ApplyResponse{ + Error: err, + Response: nil, + } + } + + // Look for plugin that handles this command and trigger it + for _, plugin := range server.plugins { + if utils.Contains[string](plugin.Commands(), strings.ToLower(request.CMD[0])) { + res, err := plugin.HandleCommand(request.CMD, server) + + if err != nil { + return utils.ApplyResponse{ + Error: err, + Response: nil, + } + } + + return utils.ApplyResponse{ + Error: nil, + Response: res, + } + } + } + + return utils.ApplyResponse{ + Error: fmt.Errorf("%s command not supported", strings.ToUpper(request.CMD[0])), + Response: nil, + } + } + + os.Stderr.Write([]byte("not raft log command\n")) return nil } // Implements raft.FSM interface func (server *Server) Snapshot() (raft.FSMSnapshot, error) { + fmt.Println("SNAPSHOT METHOD CALLED") return nil, nil } // Implements raft.FSM interface func (server *Server) Restore(snapshot io.ReadCloser) error { + fmt.Println("RESTORE METHOD CALLED") return nil } diff --git a/server/utils/types.go b/server/utils/types.go new file mode 100644 index 0000000..be787cf --- /dev/null +++ b/server/utils/types.go @@ -0,0 +1,10 @@ +package utils + +type ApplyRequest struct { + CMD []string `json:"CMD"` +} + +type ApplyResponse struct { + Error error + Response []byte +}