Created ApplyRequest and ApplyResponse types to handle calls and responses to raft.Apply.

In-Memory state between nodes is now synchronized.
This commit is contained in:
Kelvin Clement Mwinuka
2023-08-12 05:40:35 +08:00
parent bc4ce05753
commit c101b18969
4 changed files with 139 additions and 68 deletions
+49 -49
View File
@@ -19,7 +19,7 @@ services:
- SERVERID=1
- PLUGINDIR=/usr/local/lib/memstore
- INMEMORY=true
- TLS=true
- TLS=false
- HTTP=false
ports:
- "7480:7480"
@@ -43,7 +43,7 @@ services:
- JOINADDR=node1:7946
- PLUGINDIR=/usr/local/lib/memstore
- INMEMORY=true
- TLS=true
- TLS=false
- HTTP=false
ports:
- "7481:7480"
@@ -67,7 +67,7 @@ services:
- JOINADDR=node1:7946
- PLUGINDIR=/usr/local/lib/memstore
- INMEMORY=true
- TLS=true
- TLS=false
- HTTP=false
ports:
- "7482:7480"
@@ -76,50 +76,50 @@ services:
networks:
- testnet
node4:
container_name: node4
build:
context: .
dockerfile: ./server/Dockerfile
environment:
- PORT=7480
- RAFTPORT=8000
- MLPORT=7946
- KEY=/etc/ssl/certs/memstore/server.key
- CERT=/etc/ssl/certs/memstore/server.crt
- SERVERID=4
- JOINADDR=node1:7946
- PLUGINDIR=/usr/local/lib/memstore
- INMEMORY=true
- TLS=true
- HTTP=false
ports:
- "7483:7480"
- "7949:7946"
- "8003:8000"
networks:
- testnet
# node4:
# container_name: node4
# build:
# context: .
# dockerfile: ./server/Dockerfile
# environment:
# - PORT=7480
# - RAFTPORT=8000
# - MLPORT=7946
# - KEY=/etc/ssl/certs/memstore/server.key
# - CERT=/etc/ssl/certs/memstore/server.crt
# - SERVERID=4
# - JOINADDR=node1:7946
# - PLUGINDIR=/usr/local/lib/memstore
# - INMEMORY=true
# - TLS=false
# - HTTP=false
# ports:
# - "7483:7480"
# - "7949:7946"
# - "8003:8000"
# networks:
# - testnet
node5:
container_name: node5
build:
context: .
dockerfile: ./server/Dockerfile
environment:
- PORT=7480
- RAFTPORT=8000
- MLPORT=7946
- KEY=/etc/ssl/certs/memstore/server.key
- CERT=/etc/ssl/certs/memstore/server.crt
- SERVERID=5
- JOINADDR=node1:7946
- PLUGINDIR=/usr/local/lib/memstore
- INMEMORY=true
- TLS=true
- HTTP=false
ports:
- "7484:7480"
- "7950:7946"
- "8004:8000"
networks:
- testnet
# node5:
# container_name: node5
# build:
# context: .
# dockerfile: ./server/Dockerfile
# environment:
# - PORT=7480
# - RAFTPORT=8000
# - MLPORT=7946
# - KEY=/etc/ssl/certs/memstore/server.key
# - CERT=/etc/ssl/certs/memstore/server.crt
# - SERVERID=5
# - JOINADDR=node1:7946
# - PLUGINDIR=/usr/local/lib/memstore
# - INMEMORY=true
# - TLS=false
# - HTTP=false
# ports:
# - "7484:7480"
# - "7950:7946"
# - "8004:8000"
# networks:
# - testnet
+36 -18
View File
@@ -3,6 +3,7 @@ package main
import (
"bufio"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"log"
@@ -15,6 +16,7 @@ import (
"strings"
"sync"
"syscall"
"time"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
@@ -86,27 +88,43 @@ func (server *Server) handleConnection(conn net.Conn) {
connRW.Flush()
continue
} else {
// Look for plugin that handles this command and trigger it
handled := false
applyRequest := utils.ApplyRequest{CMD: cmd}
b, err := json.Marshal(applyRequest)
for _, plugin := range server.plugins {
if utils.Contains[string](plugin.Commands(), strings.ToLower(cmd[0])) {
res, err := plugin.HandleCommand(cmd, server)
if err != nil {
connRW.Write([]byte(fmt.Sprintf("-Error %s\r\n\n", err.Error())))
} else {
connRW.Write(res)
}
connRW.Flush()
handled = true
}
if err != nil {
connRW.Write([]byte("-Error could not parse request\r\n\n"))
connRW.Flush()
continue
}
if !handled {
connRW.Write([]byte(fmt.Sprintf("-Error %s command not supported\r\n\n", strings.ToUpper(cmd[0]))))
if server.isRaftLeader() {
applyFuture := server.raft.Apply(b, 500*time.Millisecond)
if err := applyFuture.Error(); err != nil {
connRW.WriteString(fmt.Sprintf("-Error %s\r\n\n", err.Error()))
connRW.Flush()
continue
}
r, ok := applyFuture.Response().(utils.ApplyResponse)
if !ok {
connRW.WriteString(fmt.Sprintf("-Error unprocessable entity %v\r\n\n", r))
connRW.Flush()
continue
}
if r.Error != nil {
connRW.WriteString(fmt.Sprintf("-Error %s\r\n\n", r.Error.Error()))
connRW.Flush()
continue
}
connRW.Write(r.Response)
connRW.Flush()
} else {
// Not Raft leader, forward message to leader and wait for a response
connRW.Write([]byte("-Error not cluster leader, cannot carry out command\r\n\n"))
connRW.Flush()
}
}
+44 -1
View File
@@ -1,6 +1,7 @@
package main
import (
"encoding/json"
"errors"
"fmt"
"io"
@@ -8,10 +9,12 @@ import (
"net"
"os"
"path/filepath"
"strings"
"time"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/kelvinmwinuka/memstore/server/utils"
)
func (server *Server) RaftInit() {
@@ -69,7 +72,7 @@ func (server *Server) RaftInit() {
// Start raft server
raftServer, err := raft.NewRaft(
raftConfig,
&raft.MockFSM{},
raft.FSM(server),
logStore,
stableStore,
snapshotStore,
@@ -101,16 +104,56 @@ func (server *Server) RaftInit() {
// Implement raft.FSM interface
func (server *Server) Apply(log *raft.Log) interface{} {
switch log.Type {
case raft.LogCommand:
var request utils.ApplyRequest
err := json.Unmarshal(log.Data, &request)
if err != nil {
return utils.ApplyResponse{
Error: err,
Response: nil,
}
}
// Look for plugin that handles this command and trigger it
for _, plugin := range server.plugins {
if utils.Contains[string](plugin.Commands(), strings.ToLower(request.CMD[0])) {
res, err := plugin.HandleCommand(request.CMD, server)
if err != nil {
return utils.ApplyResponse{
Error: err,
Response: nil,
}
}
return utils.ApplyResponse{
Error: nil,
Response: res,
}
}
}
return utils.ApplyResponse{
Error: fmt.Errorf("%s command not supported", strings.ToUpper(request.CMD[0])),
Response: nil,
}
}
os.Stderr.Write([]byte("not raft log command\n"))
return nil
}
// Implements raft.FSM interface
func (server *Server) Snapshot() (raft.FSMSnapshot, error) {
fmt.Println("SNAPSHOT METHOD CALLED")
return nil, nil
}
// Implements raft.FSM interface
func (server *Server) Restore(snapshot io.ReadCloser) error {
fmt.Println("RESTORE METHOD CALLED")
return nil
}
+10
View File
@@ -0,0 +1,10 @@
package utils
type ApplyRequest struct {
CMD []string `json:"CMD"`
}
type ApplyResponse struct {
Error error
Response []byte
}