mirror of
https://github.com/HDT3213/godis.git
synced 2024-08-04 22:48:50 +08:00
194 lines
5.2 KiB
Go
194 lines
5.2 KiB
Go
package godis
|
|
|
|
import (
|
|
"github.com/hdt3213/godis/datastruct/set"
|
|
"github.com/hdt3213/godis/interface/redis"
|
|
"github.com/hdt3213/godis/redis/reply"
|
|
"strings"
|
|
)
|
|
|
|
var forbiddenInMulti = set.Make(
|
|
"flushdb",
|
|
"flushall",
|
|
)
|
|
|
|
// Watch set watching keys
|
|
func Watch(db *DB, conn redis.Connection, args [][]byte) redis.Reply {
|
|
watching := conn.GetWatching()
|
|
for _, bkey := range args {
|
|
key := string(bkey)
|
|
watching[key] = db.GetVersion(key)
|
|
}
|
|
return reply.MakeOkReply()
|
|
}
|
|
|
|
func execGetVersion(db *DB, args [][]byte) redis.Reply {
|
|
key := string(args[0])
|
|
ver := db.GetVersion(key)
|
|
return reply.MakeIntReply(int64(ver))
|
|
}
|
|
|
|
func init() {
|
|
RegisterCommand("GetVer", execGetVersion, readAllKeys, nil, 2)
|
|
}
|
|
|
|
// invoker should lock watching keys
|
|
func isWatchingChanged(db *DB, watching map[string]uint32) bool {
|
|
for key, ver := range watching {
|
|
currentVersion := db.GetVersion(key)
|
|
if ver != currentVersion {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// StartMulti starts multi-command-transaction
|
|
func StartMulti(conn redis.Connection) redis.Reply {
|
|
if conn.InMultiState() {
|
|
return reply.MakeErrReply("ERR MULTI calls can not be nested")
|
|
}
|
|
conn.SetMultiState(true)
|
|
return reply.MakeOkReply()
|
|
}
|
|
|
|
// EnqueueCmd puts command line into `multi` pending queue
|
|
func EnqueueCmd(conn redis.Connection, cmdLine [][]byte) redis.Reply {
|
|
cmdName := strings.ToLower(string(cmdLine[0]))
|
|
cmd, ok := cmdTable[cmdName]
|
|
if !ok {
|
|
return reply.MakeErrReply("ERR unknown command '" + cmdName + "'")
|
|
}
|
|
if forbiddenInMulti.Has(cmdName) {
|
|
return reply.MakeErrReply("ERR command '" + cmdName + "' cannot be used in MULTI")
|
|
}
|
|
if cmd.prepare == nil {
|
|
return reply.MakeErrReply("ERR command '" + cmdName + "' cannot be used in MULTI")
|
|
}
|
|
if !validateArity(cmd.arity, cmdLine) {
|
|
// difference with redis: we won't enqueue command line with wrong arity
|
|
return reply.MakeArgNumErrReply(cmdName)
|
|
}
|
|
conn.EnqueueCmd(cmdLine)
|
|
return reply.MakeQueuedReply()
|
|
}
|
|
|
|
func execMulti(db *DB, conn redis.Connection) redis.Reply {
|
|
if !conn.InMultiState() {
|
|
return reply.MakeErrReply("ERR EXEC without MULTI")
|
|
}
|
|
defer conn.SetMultiState(false)
|
|
cmdLines := conn.GetQueuedCmdLine()
|
|
return db.ExecMulti(conn, conn.GetWatching(), cmdLines)
|
|
}
|
|
|
|
// ExecMulti executes multi commands transaction Atomically and Isolated
|
|
func (db *DB) ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply {
|
|
// prepare
|
|
writeKeys := make([]string, 0) // may contains duplicate
|
|
readKeys := make([]string, 0)
|
|
for _, cmdLine := range cmdLines {
|
|
cmdName := strings.ToLower(string(cmdLine[0]))
|
|
cmd := cmdTable[cmdName]
|
|
prepare := cmd.prepare
|
|
write, read := prepare(cmdLine[1:])
|
|
writeKeys = append(writeKeys, write...)
|
|
readKeys = append(readKeys, read...)
|
|
}
|
|
// set watch
|
|
watchingKeys := make([]string, 0, len(watching))
|
|
for key := range watching {
|
|
watchingKeys = append(watchingKeys, key)
|
|
}
|
|
readKeys = append(readKeys, watchingKeys...)
|
|
db.RWLocks(writeKeys, readKeys)
|
|
defer db.RWUnLocks(writeKeys, readKeys)
|
|
|
|
if isWatchingChanged(db, watching) { // watching keys changed, abort
|
|
return reply.MakeEmptyMultiBulkReply()
|
|
}
|
|
// execute
|
|
results := make([]redis.Reply, 0, len(cmdLines))
|
|
aborted := false
|
|
undoCmdLines := make([][]CmdLine, 0, len(cmdLines))
|
|
for _, cmdLine := range cmdLines {
|
|
undoCmdLines = append(undoCmdLines, db.GetUndoLogs(cmdLine))
|
|
result := db.execWithLock(cmdLine)
|
|
if reply.IsErrorReply(result) {
|
|
aborted = true
|
|
// don't rollback failed commands
|
|
undoCmdLines = undoCmdLines[:len(undoCmdLines)-1]
|
|
break
|
|
}
|
|
results = append(results, result)
|
|
}
|
|
if !aborted { //success
|
|
db.addVersion(writeKeys...)
|
|
return reply.MakeMultiRawReply(results)
|
|
}
|
|
// undo if aborted
|
|
size := len(undoCmdLines)
|
|
for i := size - 1; i >= 0; i-- {
|
|
curCmdLines := undoCmdLines[i]
|
|
if len(curCmdLines) == 0 {
|
|
continue
|
|
}
|
|
for _, cmdLine := range curCmdLines {
|
|
db.execWithLock(cmdLine)
|
|
}
|
|
}
|
|
return reply.MakeErrReply("EXECABORT Transaction discarded because of previous errors.")
|
|
}
|
|
|
|
// DiscardMulti drops MULTI pending commands
|
|
func DiscardMulti(conn redis.Connection) redis.Reply {
|
|
if !conn.InMultiState() {
|
|
return reply.MakeErrReply("ERR DISCARD without MULTI")
|
|
}
|
|
conn.ClearQueuedCmds()
|
|
conn.SetMultiState(false)
|
|
return reply.MakeOkReply()
|
|
}
|
|
|
|
// GetUndoLogs return rollback commands
|
|
func (db *DB) GetUndoLogs(cmdLine [][]byte) []CmdLine {
|
|
cmdName := strings.ToLower(string(cmdLine[0]))
|
|
cmd, ok := cmdTable[cmdName]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
undo := cmd.undo
|
|
if undo == nil {
|
|
return nil
|
|
}
|
|
return undo(db, cmdLine[1:])
|
|
}
|
|
|
|
// execWithLock executes normal commands, invoker should provide locks
|
|
func (db *DB) execWithLock(cmdLine [][]byte) redis.Reply {
|
|
cmdName := strings.ToLower(string(cmdLine[0]))
|
|
cmd, ok := cmdTable[cmdName]
|
|
if !ok {
|
|
return reply.MakeErrReply("ERR unknown command '" + cmdName + "'")
|
|
}
|
|
if !validateArity(cmd.arity, cmdLine) {
|
|
return reply.MakeArgNumErrReply(cmdName)
|
|
}
|
|
fun := cmd.executor
|
|
return fun(db, cmdLine[1:])
|
|
}
|
|
|
|
// GetRelatedKeys analysis related keys
|
|
func GetRelatedKeys(cmdLine [][]byte) ([]string, []string) {
|
|
cmdName := strings.ToLower(string(cmdLine[0]))
|
|
cmd, ok := cmdTable[cmdName]
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
prepare := cmd.prepare
|
|
if prepare == nil {
|
|
return nil, nil
|
|
}
|
|
return prepare(cmdLine[1:])
|
|
}
|