Created AppendReadWriter and PreambleReadWriter interfaces for file handling in the log and preamble packages respectively.

Seek to the beginning of the preamble file after truncating.
This commit is contained in:
Kelvin Clement Mwinuka
2024-02-25 15:16:27 +08:00
parent f2e32d8b4f
commit 8d7fe0225a
6 changed files with 439 additions and 395 deletions
+204 -204
View File
@@ -43,207 +43,207 @@ services:
- ./volumes/standalone_node:/var/lib/echovault
networks:
- testnet
cluster_node_1:
container_name: cluster_node_1
build:
context: .
dockerfile: Dockerfile.dev
environment:
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- KEY=/etc/ssl/certs/echovault/server1.key
- CERT=/etc/ssl/certs/echovault/server1.crt
- SERVER_ID=1
- PLUGIN_DIR=/usr/local/lib/echovault
- DATA_DIR=/var/lib/echovault
- IN_MEMORY=false
- TLS=true
- MTLS=true
- BOOTSTRAP_CLUSTER=true
- ACL_CONFIG=/etc/config/echovault/acl.yml
- REQUIRE_PASS=false
- FORWARD_COMMAND=true
- SNAPSHOT_THRESHOLD=1000
- SNAPSHOT_INTERVAL=5m30s
- RESTORE_SNAPSHOT=false
- RESTORE_AOF=false
# List of server cert/key pairs
- CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key
- CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key
# List of client certificate authorities
- CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt
ports:
- "7480:7480"
- "7945:7946"
- "8000:8000"
volumes:
- ./config/acl.yml:/etc/config/echovault/acl.yml
- ./volumes/cluster_node_1:/var/lib/echovault
networks:
- testnet
cluster_node_2:
container_name: cluster_node_2
build:
context: .
dockerfile: Dockerfile.dev
environment:
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- KEY=/etc/ssl/certs/echovault/server1.key
- CERT=/etc/ssl/certs/echovault/server1.crt
- SERVER_ID=2
- JOIN_ADDR=cluster_node_1:7946
- PLUGIN_DIR=/usr/local/lib/echovault
- DATA_DIR=/var/lib/echovault
- IN_MEMORY=false
- TLS=true
- MTLS=true
- BOOTSTRAP_CLUSTER=false
- ACL_CONFIG=/etc/config/echovault/acl.yml
- REQUIRE_PASS=false
- FORWARD_COMMAND=true
- SNAPSHOT_THRESHOLD=1000
- SNAPSHOT_INTERVAL=5m30s
- RESTORE_SNAPSHOT=false
- RESTORE_AOF=false
# List of server cert/key pairs
- CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key
- CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key
# List of client certificate authorities
- CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt
ports:
- "7481:7480"
- "7947:7946"
- "8001:8000"
volumes:
- ./config/acl.yml:/etc/config/echovault/acl.yml
- ./volumes/cluster_node_2:/var/lib/echovault
networks:
- testnet
cluster_node_3:
container_name: cluster_node_3
build:
context: .
dockerfile: Dockerfile.dev
environment:
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- KEY=/etc/ssl/certs/echovault/server1.key
- CERT=/etc/ssl/certs/echovault/server1.crt
- SERVER_ID=3
- JOIN_ADDR=cluster_node_1:7946
- PLUGIN_DIR=/usr/local/lib/echovault
- DATA_DIR=/var/lib/echovault
- IN_MEMORY=false
- TLS=true
- MTLS=true
- BOOTSTRAP_CLUSTER=false
- ACL_CONFIG=/etc/config/echovault/acl.yml
- REQUIRE_PASS=false
- FORWARD_COMMAND=true
- SNAPSHOT_THRESHOLD=1000
- SNAPSHOT_INTERVAL=5m30s
- RESTORE_SNAPSHOT=false
- RESTORE_AOF=false
# List of server cert/key pairs
- CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key
- CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key
# List of client certificate authorities
- CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt
ports:
- "7482:7480"
- "7948:7946"
- "8002:8000"
volumes:
- ./config/acl.yml:/etc/config/echovault/acl.yml
- ./volumes/cluster_node_3:/var/lib/echovault
networks:
- testnet
cluster_node_4:
container_name: cluster_node_4
build:
context: .
dockerfile: Dockerfile.dev
environment:
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- KEY=/etc/ssl/certs/echovault/server1.key
- CERT=/etc/ssl/certs/echovault/server1.crt
- SERVER_ID=4
- JOIN_ADDR=cluster_node_1:7946
- PLUGIN_DIR=/usr/local/lib/echovault
- DATA_DIR=/var/lib/echovault
- IN_MEMORY=false
- TLS=true
- MTLS=true
- BOOTSTRAP_CLUSTER=false
- ACL_CONFIG=/etc/config/echovault/acl.yml
- REQUIRE_PASS=false
- FORWARD_COMMAND=true
- SNAPSHOT_THRESHOLD=1000
- SNAPSHOT_INTERVAL=5m30s
- RESTORE_SNAPSHOT=false
- RESTORE_AOF=false
# List of server cert/key pairs
- CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key
- CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key
# List of client certificate authorities
- CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt
ports:
- "7483:7480"
- "7949:7946"
- "8003:8000"
volumes:
- ./config/acl.yml:/etc/config/echovault/acl.yml
- ./volumes/cluster_node_4:/var/lib/echovault
networks:
- testnet
cluster_node_5:
container_name: cluster_node_5
build:
context: .
dockerfile: Dockerfile.dev
environment:
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- KEY=/etc/ssl/certs/echovault/server1.key
- CERT=/etc/ssl/certs/echovault/server1.crt
- SERVER_ID=5
- JOIN_ADDR=cluster_node_1:7946
- PLUGIN_DIR=/usr/local/lib/echovault
- DATA_DIR=/var/lib/echovault
- IN_MEMORY=false
- TLS=true
- MTLS=true
- BOOTSTRAP_CLUSTER=false
- ACL_CONFIG=/etc/config/echovault/acl.yml
- REQUIRE_PASS=false
- FORWARD_COMMAND=true
- SNAPSHOT_THRESHOLD=1000
- SNAPSHOT_INTERVAL=5m30s
- RESTORE_SNAPSHOT=false
- RESTORE_AOF=false
# List of server cert/key pairs
- CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key
- CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key
# List of client certificate authorities
- CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt
ports:
- "7484:7480"
- "7950:7946"
- "8004:8000"
volumes:
- ./config/acl.yml:/etc/config/echovault/acl.yml
- ./volumes/cluster_node_5:/var/lib/echovault
networks:
- testnet
#
# cluster_node_1:
# container_name: cluster_node_1
# build:
# context: .
# dockerfile: Dockerfile.dev
# environment:
# - PORT=7480
# - RAFT_PORT=8000
# - ML_PORT=7946
# - KEY=/etc/ssl/certs/echovault/server1.key
# - CERT=/etc/ssl/certs/echovault/server1.crt
# - SERVER_ID=1
# - PLUGIN_DIR=/usr/local/lib/echovault
# - DATA_DIR=/var/lib/echovault
# - IN_MEMORY=false
# - TLS=true
# - MTLS=true
# - BOOTSTRAP_CLUSTER=true
# - ACL_CONFIG=/etc/config/echovault/acl.yml
# - REQUIRE_PASS=false
# - FORWARD_COMMAND=true
# - SNAPSHOT_THRESHOLD=1000
# - SNAPSHOT_INTERVAL=5m30s
# - RESTORE_SNAPSHOT=false
# - RESTORE_AOF=false
# # List of server cert/key pairs
# - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key
# - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key
# # List of client certificate authorities
# - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt
# ports:
# - "7480:7480"
# - "7945:7946"
# - "8000:8000"
# volumes:
# - ./config/acl.yml:/etc/config/echovault/acl.yml
# - ./volumes/cluster_node_1:/var/lib/echovault
# networks:
# - testnet
#
# cluster_node_2:
# container_name: cluster_node_2
# build:
# context: .
# dockerfile: Dockerfile.dev
# environment:
# - PORT=7480
# - RAFT_PORT=8000
# - ML_PORT=7946
# - KEY=/etc/ssl/certs/echovault/server1.key
# - CERT=/etc/ssl/certs/echovault/server1.crt
# - SERVER_ID=2
# - JOIN_ADDR=cluster_node_1:7946
# - PLUGIN_DIR=/usr/local/lib/echovault
# - DATA_DIR=/var/lib/echovault
# - IN_MEMORY=false
# - TLS=true
# - MTLS=true
# - BOOTSTRAP_CLUSTER=false
# - ACL_CONFIG=/etc/config/echovault/acl.yml
# - REQUIRE_PASS=false
# - FORWARD_COMMAND=true
# - SNAPSHOT_THRESHOLD=1000
# - SNAPSHOT_INTERVAL=5m30s
# - RESTORE_SNAPSHOT=false
# - RESTORE_AOF=false
# # List of server cert/key pairs
# - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key
# - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key
# # List of client certificate authorities
# - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt
# ports:
# - "7481:7480"
# - "7947:7946"
# - "8001:8000"
# volumes:
# - ./config/acl.yml:/etc/config/echovault/acl.yml
# - ./volumes/cluster_node_2:/var/lib/echovault
# networks:
# - testnet
#
# cluster_node_3:
# container_name: cluster_node_3
# build:
# context: .
# dockerfile: Dockerfile.dev
# environment:
# - PORT=7480
# - RAFT_PORT=8000
# - ML_PORT=7946
# - KEY=/etc/ssl/certs/echovault/server1.key
# - CERT=/etc/ssl/certs/echovault/server1.crt
# - SERVER_ID=3
# - JOIN_ADDR=cluster_node_1:7946
# - PLUGIN_DIR=/usr/local/lib/echovault
# - DATA_DIR=/var/lib/echovault
# - IN_MEMORY=false
# - TLS=true
# - MTLS=true
# - BOOTSTRAP_CLUSTER=false
# - ACL_CONFIG=/etc/config/echovault/acl.yml
# - REQUIRE_PASS=false
# - FORWARD_COMMAND=true
# - SNAPSHOT_THRESHOLD=1000
# - SNAPSHOT_INTERVAL=5m30s
# - RESTORE_SNAPSHOT=false
# - RESTORE_AOF=false
# # List of server cert/key pairs
# - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key
# - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key
# # List of client certificate authorities
# - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt
# ports:
# - "7482:7480"
# - "7948:7946"
# - "8002:8000"
# volumes:
# - ./config/acl.yml:/etc/config/echovault/acl.yml
# - ./volumes/cluster_node_3:/var/lib/echovault
# networks:
# - testnet
#
# cluster_node_4:
# container_name: cluster_node_4
# build:
# context: .
# dockerfile: Dockerfile.dev
# environment:
# - PORT=7480
# - RAFT_PORT=8000
# - ML_PORT=7946
# - KEY=/etc/ssl/certs/echovault/server1.key
# - CERT=/etc/ssl/certs/echovault/server1.crt
# - SERVER_ID=4
# - JOIN_ADDR=cluster_node_1:7946
# - PLUGIN_DIR=/usr/local/lib/echovault
# - DATA_DIR=/var/lib/echovault
# - IN_MEMORY=false
# - TLS=true
# - MTLS=true
# - BOOTSTRAP_CLUSTER=false
# - ACL_CONFIG=/etc/config/echovault/acl.yml
# - REQUIRE_PASS=false
# - FORWARD_COMMAND=true
# - SNAPSHOT_THRESHOLD=1000
# - SNAPSHOT_INTERVAL=5m30s
# - RESTORE_SNAPSHOT=false
# - RESTORE_AOF=false
# # List of server cert/key pairs
# - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key
# - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key
# # List of client certificate authorities
# - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt
# ports:
# - "7483:7480"
# - "7949:7946"
# - "8003:8000"
# volumes:
# - ./config/acl.yml:/etc/config/echovault/acl.yml
# - ./volumes/cluster_node_4:/var/lib/echovault
# networks:
# - testnet
#
# cluster_node_5:
# container_name: cluster_node_5
# build:
# context: .
# dockerfile: Dockerfile.dev
# environment:
# - PORT=7480
# - RAFT_PORT=8000
# - ML_PORT=7946
# - KEY=/etc/ssl/certs/echovault/server1.key
# - CERT=/etc/ssl/certs/echovault/server1.crt
# - SERVER_ID=5
# - JOIN_ADDR=cluster_node_1:7946
# - PLUGIN_DIR=/usr/local/lib/echovault
# - DATA_DIR=/var/lib/echovault
# - IN_MEMORY=false
# - TLS=true
# - MTLS=true
# - BOOTSTRAP_CLUSTER=false
# - ACL_CONFIG=/etc/config/echovault/acl.yml
# - REQUIRE_PASS=false
# - FORWARD_COMMAND=true
# - SNAPSHOT_THRESHOLD=1000
# - SNAPSHOT_INTERVAL=5m30s
# - RESTORE_SNAPSHOT=false
# - RESTORE_AOF=false
# # List of server cert/key pairs
# - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key
# - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key
# # List of client certificate authorities
# - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt
# ports:
# - "7484:7480"
# - "7950:7946"
# - "8004:8000"
# volumes:
# - ./config/acl.yml:/etc/config/echovault/acl.yml
# - ./volumes/cluster_node_5:/var/lib/echovault
# networks:
# - testnet
-101
View File
@@ -1,101 +0,0 @@
package aof
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"os"
"sync"
)
type AppendStore struct {
rw io.ReadWriter
mut sync.Mutex
handleCommand func(ctx context.Context, command []byte, conn *net.Conn, replay bool) ([]byte, error)
}
func NewAppendStore() AppendStore {
return AppendStore{}
}
func (store *AppendStore) Write(command []byte) error {
store.mut.Lock()
defer store.mut.Unlock()
_, err := store.rw.Write(command)
return err
}
func (store *AppendStore) Sync() error {
store.mut.Lock()
store.mut.Unlock()
file, ok := store.rw.(*os.File)
if ok {
return file.Sync()
}
return nil
}
func (store *AppendStore) Restore(ctx context.Context) error {
store.mut.Lock()
defer store.mut.Unlock()
buf := bufio.NewReader(store.rw)
var commands [][]byte
var line []byte
for {
b, _, err := buf.ReadLine()
if err != nil && errors.Is(err, io.EOF) {
break
} else if err != nil {
return err
}
if len(b) <= 0 {
line = append(line, []byte("\r\n\r\n")...)
commands = append(commands, line)
line = []byte{}
continue
}
if len(line) > 0 {
line = append(line, append([]byte("\r\n"), bytes.TrimLeft(b, "\x00")...)...)
continue
}
line = append(line, bytes.TrimLeft(b, "\x00")...)
}
for _, c := range commands {
if _, err := store.handleCommand(ctx, c, nil, true); err != nil {
return err
}
}
return nil
}
func (store *AppendStore) Truncate() error {
rw, ok := store.rw.(interface {
Truncate(size int64) error
})
if !ok {
fmt.Println("could not truncate AOF file")
}
if err := rw.Truncate(0); err != nil {
return err
}
return nil
}
func (store *AppendStore) Close() error {
store.mut.Lock()
defer store.mut.Unlock()
file, ok := store.rw.(*os.File)
if !ok {
return nil
}
return file.Close()
}
@@ -2,15 +2,12 @@ package aof
import (
"context"
logstore "github.com/echovault/echovault/src/server/aof/log"
"github.com/echovault/echovault/src/server/aof/preamble"
"github.com/echovault/echovault/src/utils"
"io"
"log"
"net"
"os"
"path"
"strings"
"sync"
"time"
)
// This package handles AOF logging in standalone mode only.
@@ -32,11 +29,11 @@ type Engine struct {
mut sync.Mutex
logChan chan []byte
logCount uint64
preambleStore *PreambleStore
appendStore AppendStore
preambleStore *preamble.PreambleStore
appendStore *logstore.AppendStore
}
func NewAOFEngine(opts Opts, appendRW io.ReadWriter, preambleRW io.ReadWriter) (*Engine, error) {
func NewAOFEngine(opts Opts, appendRW logstore.AppendReadWriter, preambleRW preamble.PreambleReadWriter) (*Engine, error) {
engine := &Engine{
options: opts,
mut: sync.Mutex{},
@@ -44,23 +41,12 @@ func NewAOFEngine(opts Opts, appendRW io.ReadWriter, preambleRW io.ReadWriter) (
logCount: 0,
}
// Obtain preamble file handler
if preambleRW == nil {
f, err := os.OpenFile(
path.Join(engine.options.Config.DataDir, "aof", "preamble.bin"),
os.O_WRONLY|os.O_CREATE|os.O_APPEND,
os.ModePerm)
if err != nil {
return nil, err
}
preambleRW = f
}
// Setup Preamble engine
engine.preambleStore = NewPreambleStore(
WithReadWriter(preambleRW),
WithGetStateFunc(opts.GetState),
WithSetValueFunc(func(key string, value interface{}) {
engine.preambleStore = preamble.NewPreambleStore(
preamble.WithDirectory(engine.options.Config.DataDir),
preamble.WithReadWriter(preambleRW),
preamble.WithGetStateFunc(opts.GetState),
preamble.WithSetValueFunc(func(key string, value interface{}) {
if _, err := engine.options.CreateKeyAndLock(context.Background(), key); err != nil {
log.Println(err)
}
@@ -69,28 +55,18 @@ func NewAOFEngine(opts Opts, appendRW io.ReadWriter, preambleRW io.ReadWriter) (
}),
)
// 1. Create AOF directory if it does not exist.
if err := os.MkdirAll(path.Join(engine.options.Config.DataDir, "aof"), os.ModePerm); err != nil {
return nil, err
}
// 2. Setup storage engine.
engine.appendStore = AppendStore{
rw: appendRW,
mut: sync.Mutex{},
}
// If out is not provided by the caller, then create/open the new AOF file based on the configuration.
if appendRW == nil {
f, err := os.OpenFile(
path.Join(engine.options.Config.DataDir, "aof", "log.aof"),
os.O_WRONLY|os.O_CREATE|os.O_APPEND,
os.ModePerm)
if err != nil {
return nil, err
}
engine.appendStore.rw = f
}
// Setup AOF log store engine
engine.appendStore = logstore.NewAppendStore(
logstore.WithDirectory(engine.options.Config.DataDir),
logstore.WithStrategy(engine.options.Config.AOFSyncStrategy),
logstore.WithReadWriter(appendRW),
logstore.WithHandleCommandFunc(func(command []byte) {
_, err := engine.options.HandleCommand(context.Background(), command, nil, true)
if err != nil {
log.Println(err)
}
}),
)
// 3. Start the goroutine to pick up queued commands in order to write them to the file.
// LogCommand will get the open file handler from the struct top perform the AOF operation.
@@ -100,27 +76,9 @@ func NewAOFEngine(opts Opts, appendRW io.ReadWriter, preambleRW io.ReadWriter) (
if err := engine.appendStore.Write(c); err != nil {
log.Println(err)
}
if strings.EqualFold(engine.options.Config.AOFSyncStrategy, "always") {
if err := engine.appendStore.Sync(); err != nil {
log.Println(err)
}
}
}
}()
// 4. Start another goroutine that takes handles syncing the content to the file system.
// No need to start this goroutine if sync strategy is anything other than 'everysec'.
if strings.EqualFold(engine.options.Config.AOFSyncStrategy, "everysec") {
go func() {
for {
if err := engine.appendStore.Sync(); err != nil {
log.Println(err)
}
<-time.After(1 * time.Second)
}
}()
}
return engine, nil
}
+164
View File
@@ -0,0 +1,164 @@
package log
import (
"bufio"
"bytes"
"context"
"errors"
"io"
"log"
"os"
"path"
"strings"
"sync"
"time"
)
type AppendReadWriter interface {
io.ReadWriter
io.Closer
Truncate(size int64) error
Sync() error
}
type AppendStore struct {
strategy string // Append file sync strategy. Can only be "always", "everysec", or "no
mut sync.Mutex // Store mutex
rw AppendReadWriter // The ReadWriter used to persist and load the log
directory string // The directory for the AOF file if we must create one
handleCommand func(command []byte) // Function to handle command read from AOF log after restore
}
func WithStrategy(strategy string) func(store *AppendStore) {
return func(store *AppendStore) {
store.strategy = strategy
}
}
func WithReadWriter(rw AppendReadWriter) func(store *AppendStore) {
return func(store *AppendStore) {
store.rw = rw
}
}
func WithDirectory(directory string) func(store *AppendStore) {
return func(store *AppendStore) {
store.directory = directory
}
}
func WithHandleCommandFunc(f func(command []byte)) func(store *AppendStore) {
return func(store *AppendStore) {
store.handleCommand = f
}
}
func NewAppendStore(options ...func(store *AppendStore)) *AppendStore {
store := &AppendStore{
directory: "",
strategy: "everysec",
rw: nil,
mut: sync.Mutex{},
handleCommand: func(command []byte) {
// No-Op
},
}
for _, option := range options {
option(store)
}
// If rw is nil, use a default file at the provided directory
if store.rw == nil {
f, err := os.OpenFile(path.Join(store.directory, "aof", "log.aof"), os.O_RDWR|os.O_CREATE|os.O_APPEND, os.ModePerm)
if err != nil {
log.Println(err)
}
store.rw = f
}
// Start another goroutine that takes handles syncing the content to the file system.
// No need to start this goroutine if sync strategy is anything other than 'everysec'.
if strings.EqualFold(store.strategy, "everysec") {
go func() {
for {
if err := store.Sync(); err != nil {
log.Println(err)
}
<-time.After(1 * time.Second)
}
}()
}
return store
}
func (store *AppendStore) Write(command []byte) error {
store.mut.Lock()
defer store.mut.Unlock()
if _, err := store.rw.Write(command); err != nil {
return err
}
if strings.EqualFold(store.strategy, "always") {
if err := store.Sync(); err != nil {
return err
}
}
return nil
}
func (store *AppendStore) Sync() error {
store.mut.Lock()
store.mut.Unlock()
return store.rw.Sync()
}
func (store *AppendStore) Restore(ctx context.Context) error {
store.mut.Lock()
defer store.mut.Unlock()
buf := bufio.NewReader(store.rw)
var commands [][]byte
var line []byte
for {
b, _, err := buf.ReadLine()
if err != nil && errors.Is(err, io.EOF) {
break
} else if err != nil {
return err
}
if len(b) <= 0 {
line = append(line, []byte("\r\n\r\n")...)
commands = append(commands, line)
line = []byte{}
continue
}
if len(line) > 0 {
line = append(line, append([]byte("\r\n"), bytes.TrimLeft(b, "\x00")...)...)
continue
}
line = append(line, bytes.TrimLeft(b, "\x00")...)
}
for _, c := range commands {
store.handleCommand(c)
}
return nil
}
func (store *AppendStore) Truncate() error {
store.mut.Lock()
defer store.mut.Unlock()
if err := store.rw.Truncate(0); err != nil {
return err
}
return nil
}
func (store *AppendStore) Close() error {
store.mut.Lock()
defer store.mut.Unlock()
return store.rw.Close()
}
@@ -1,23 +1,30 @@
package aof
package preamble
import (
"context"
"encoding/json"
"errors"
"io"
"log"
"os"
"path"
"sync"
)
type PreambleStore struct {
rw io.ReadWriter
mut sync.Mutex
getState func() map[string]interface{}
setValue func(key string, value interface{})
type PreambleReadWriter interface {
io.ReadWriteSeeker
io.Closer
Truncate(size int64) error
Sync() error
}
func WithReadWriter(rw io.ReadWriter) func(store *PreambleStore) {
type PreambleStore struct {
rw PreambleReadWriter
mut sync.Mutex
directory string
getState func() map[string]interface{}
setValue func(key string, value interface{})
}
func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) {
return func(store *PreambleStore) {
store.rw = rw
}
@@ -35,10 +42,17 @@ func WithSetValueFunc(f func(key string, value interface{})) func(store *Preambl
}
}
func WithDirectory(directory string) func(store *PreambleStore) {
return func(store *PreambleStore) {
store.directory = directory
}
}
func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore {
store := &PreambleStore{
rw: nil,
mut: sync.Mutex{},
rw: nil,
mut: sync.Mutex{},
directory: "",
getState: func() map[string]interface{} {
// No-Op by default
return nil
@@ -52,6 +66,15 @@ func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore {
option(store)
}
// If rw is nil, create the default
if store.rw == nil {
f, err := os.OpenFile(path.Join(store.directory, "aof", "preamble.bin"), os.O_RDWR|os.O_CREATE, os.ModePerm)
if err != nil {
log.Println(err)
}
store.rw = f
}
return store
}
@@ -67,14 +90,11 @@ func (store *PreambleStore) CreatePreamble() error {
}
// Truncate the preamble first
rw, ok := store.rw.(interface {
Truncate(size int64) error
})
if !ok {
return errors.New("could not truncate preamble file")
if err = store.rw.Truncate(0); err != nil {
return err
}
if err = rw.Truncate(0); err != nil {
// Seek to the beginning of the file after truncating
if _, err = store.rw.Seek(0, 0); err != nil {
return err
}
@@ -82,18 +102,15 @@ func (store *PreambleStore) CreatePreamble() error {
return err
}
// If the rw is a file, sync it immediately
file, ok := store.rw.(*os.File)
if ok {
if err = file.Sync(); err != nil {
log.Println(err)
}
// Sync the changes
if err = store.rw.Sync(); err != nil {
return err
}
return nil
}
func (store *PreambleStore) Restore(ctx context.Context) error {
func (store *PreambleStore) Restore() error {
if store.rw == nil {
return nil
}
@@ -115,3 +132,9 @@ func (store *PreambleStore) Restore(ctx context.Context) error {
return nil
}
func (store *PreambleStore) Close() error {
store.mut.Lock()
defer store.mut.Unlock()
return store.rw.Close()
}
+1 -1
View File
@@ -105,7 +105,7 @@ func NewServer(opts Opts) *Server {
KeyUnlock: server.KeyUnlock,
SetValue: server.SetValue,
HandleCommand: server.handleCommand,
}, nil)
}, nil, nil)
if err != nil {
log.Println(err)
}