This commit is contained in:
dukai 2025-05-27 19:04:24 +08:00
parent 5b95256929
commit 2b468c79a9
9 changed files with 7 additions and 383 deletions

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
robotfs
robotfs-darwin
robotfs-linux
go.sum
pb/*.pb.go
.vscode/

View File

@ -1,37 +0,0 @@
package store
import (
"context"
"errors"
"robotfs/utils"
)
var (
ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
ErrKvNotImplemented = errors.New("kv not implemented yet")
ErrKvNotFound = errors.New("kv: not found")
)
type ListEachEntryFunc func(entry *utils.Entry) bool
type MetaStore interface {
GetName() string
Initialize(configuration utils.Configuration, prefix string) error
InsertEntry(context.Context, *utils.Entry) error
UpdateEntry(context.Context, *utils.Entry) (err error)
FindEntry(context.Context, utils.FullPath) (entry *utils.Entry, err error)
DeleteEntry(context.Context, utils.FullPath) (err error)
DeleteFolderChildren(context.Context, utils.FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
// Todo: implement this in the future
ListDirectoryPrefixedEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
BeginTransaction(ctx context.Context) (context.Context, error)
CommitTransaction(ctx context.Context) error
RollbackTransaction(ctx context.Context) error
Shutdown()
}

View File

@ -1,40 +0,0 @@
package redis_lua
import (
"robotfs/utils"
"github.com/redis/go-redis/v9"
)
type RedisLuaStore struct {
UniversalRedisLuaStore
}
func NewRedisLuaStore() *RedisLuaStore {
return &RedisLuaStore{}
}
func (store *RedisLuaStore) GetName() string {
return "redis_lua"
}
func (store *RedisLuaStore) Initialize(configuration utils.Configuration, prefix string) (err error) {
return store.initialize(
configuration.GetString(prefix+"address"),
configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"),
configuration.GetInt(prefix+"database"),
configuration.GetStringSlice(prefix+"superLargeDirectories"),
)
}
func (store *RedisLuaStore) initialize(hostPort string, username, password string, database int, superLargeDirectories []string) (err error) {
store.Client = redis.NewClient(&redis.Options{
Addr: hostPort,
Username: username,
Password: password,
DB: database,
})
store.loadSuperLargeDirectories(superLargeDirectories)
return
}

View File

@ -1,19 +0,0 @@
-- KEYS[1]: full path of entry
local fullpath = KEYS[1]
-- KEYS[2]: full path of entry
local fullpath_list_key = KEYS[2]
-- KEYS[3]: dir of the entry
local dir_list_key = KEYS[3]
-- ARGV[1]: isSuperLargeDirectory
local isSuperLargeDirectory = ARGV[1] == "1"
-- ARGV[2]: name of the entry
local name = ARGV[2]
redis.call("DEL", fullpath, fullpath_list_key)
if not isSuperLargeDirectory and name ~= "" then
redis.call("ZREM", dir_list_key, name)
end
return 0

View File

@ -1,15 +0,0 @@
-- KEYS[1]: full path of entry
local fullpath = KEYS[1]
if fullpath ~= "" and string.sub(fullpath, -1) == "/" then
fullpath = string.sub(fullpath, 0, -2)
end
local files = redis.call("ZRANGE", fullpath .. "\0", "0", "-1")
for _, name in ipairs(files) do
local file_path = fullpath .. "/" .. name
redis.call("DEL", file_path, file_path .. "\0")
end
return 0

View File

@ -1,24 +0,0 @@
package stored_procedure
import (
_ "embed"
"github.com/redis/go-redis/v9"
)
func init() {
InsertEntryScript = redis.NewScript(insertEntry)
DeleteEntryScript = redis.NewScript(deleteEntry)
DeleteFolderChildrenScript = redis.NewScript(deleteFolderChildren)
}
//go:embed insert_entry.lua
var insertEntry string
var InsertEntryScript *redis.Script
//go:embed delete_entry.lua
var deleteEntry string
var DeleteEntryScript *redis.Script
//go:embed delete_folder_children.lua
var deleteFolderChildren string
var DeleteFolderChildrenScript *redis.Script

View File

@ -1,27 +0,0 @@
-- KEYS[1]: full path of entry
local full_path = KEYS[1]
-- KEYS[2]: dir of the entry
local dir_list_key = KEYS[2]
-- ARGV[1]: content of the entry
local entry = ARGV[1]
-- ARGV[2]: TTL of the entry
local ttlSec = tonumber(ARGV[2])
-- ARGV[3]: isSuperLargeDirectory
local isSuperLargeDirectory = ARGV[3] == "1"
-- ARGV[4]: zscore of the entry in zset
local zscore = tonumber(ARGV[4])
-- ARGV[5]: name of the entry
local name = ARGV[5]
if ttlSec > 0 then
redis.call("SET", full_path, entry, "EX", ttlSec)
else
redis.call("SET", full_path, entry)
end
if not isSuperLargeDirectory and name ~= "" then
redis.call("ZADD", dir_list_key, "NX", zscore, name)
end
return 0

View File

@ -1,180 +0,0 @@
package redis_lua
import (
"context"
"errors"
"fmt"
"robotfs/store"
"robotfs/store/redis_lua/stored_procedure"
"robotfs/utils"
"github.com/redis/go-redis/v9"
)
const (
DIR_LIST_MARKER = "\x00"
)
var ErrNotFound = errors.New("no entry is found")
type UniversalRedisLuaStore struct {
Client redis.UniversalClient
superLargeDirectoryHash map[string]bool
}
func (store *UniversalRedisLuaStore) isSuperLargeDirectory(dir string) (isSuperLargeDirectory bool) {
_, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
return
}
func (store *UniversalRedisLuaStore) loadSuperLargeDirectories(superLargeDirectories []string) {
store.superLargeDirectoryHash = make(map[string]bool)
for _, dir := range superLargeDirectories {
store.superLargeDirectoryHash[dir] = true
}
}
func (store *UniversalRedisLuaStore) BeginTransaction(ctx context.Context) (context.Context, error) {
return ctx, nil
}
func (store *UniversalRedisLuaStore) CommitTransaction(ctx context.Context) error {
return nil
}
func (store *UniversalRedisLuaStore) RollbackTransaction(ctx context.Context) error {
return nil
}
func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *utils.Entry) (err error) {
value, err := entry.Encode()
if err != nil {
return fmt.Errorf("encoding %s: %v", entry.FullPath, err)
}
dir, name := entry.FullPath.DirAndName()
err = stored_procedure.InsertEntryScript.Run(ctx, store.Client,
[]string{string(entry.FullPath), genDirectoryListKey(dir)},
value, 0,
store.isSuperLargeDirectory(dir), 0, name).Err()
if err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
return nil
}
func (store *UniversalRedisLuaStore) UpdateEntry(ctx context.Context, entry *utils.Entry) (err error) {
return store.InsertEntry(ctx, entry)
}
func (store *UniversalRedisLuaStore) FindEntry(ctx context.Context, fullpath utils.FullPath) (entry *utils.Entry, err error) {
data, err := store.Client.Get(ctx, string(fullpath)).Result()
if err == redis.Nil {
return nil, ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
entry = &utils.Entry{
FullPath: fullpath,
}
err = entry.Decode([]byte(data))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
return entry, nil
}
func (store *UniversalRedisLuaStore) DeleteEntry(ctx context.Context, fullpath utils.FullPath) (err error) {
dir, name := fullpath.DirAndName()
err = stored_procedure.DeleteEntryScript.Run(ctx, store.Client,
[]string{string(fullpath), genDirectoryListKey(string(fullpath)), genDirectoryListKey(dir)},
store.isSuperLargeDirectory(dir), name).Err()
if err != nil {
return fmt.Errorf("DeleteEntry %s : %v", fullpath, err)
}
return nil
}
func (store *UniversalRedisLuaStore) DeleteFolderChildren(ctx context.Context, fullpath utils.FullPath) (err error) {
if store.isSuperLargeDirectory(string(fullpath)) {
return nil
}
err = stored_procedure.DeleteFolderChildrenScript.Run(ctx, store.Client,
[]string{string(fullpath)}).Err()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
}
return nil
}
// Todo: implement this in the future
func (store *UniversalRedisLuaStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc store.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, nil
}
func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc store.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath))
min := "-"
if startFileName != "" {
if includeStartFile {
min = "[" + startFileName
} else {
min = "(" + startFileName
}
}
members, err := store.Client.ZRangeByLex(ctx, dirListKey, &redis.ZRangeBy{
Min: min,
Max: "+",
Offset: 0,
Count: limit,
}).Result()
if err != nil {
return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
// fetch entry meta
for _, fileName := range members {
path := utils.NewFullPath(string(dirPath), fileName)
entry, err := store.FindEntry(ctx, path)
lastFileName = fileName
if err != nil {
if err == ErrNotFound {
continue
}
} else {
if !eachEntryFunc(entry) {
break
}
}
}
return lastFileName, err
}
func genDirectoryListKey(dir string) (dirList string) {
return dir + DIR_LIST_MARKER
}
func (store *UniversalRedisLuaStore) Shutdown() {
store.Client.Close()
}

View File

@ -1,41 +0,0 @@
package redis_lua
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)
func (store *UniversalRedisLuaStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
_, err = store.Client.Set(ctx, string(key), value, 0).Result()
if err != nil {
return fmt.Errorf("kv put: %v", err)
}
return nil
}
func (store *UniversalRedisLuaStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
data, err := store.Client.Get(ctx, string(key)).Result()
if err == redis.Nil {
return nil, fmt.Errorf("kv: not found")
}
return []byte(data), err
}
func (store *UniversalRedisLuaStore) KvDelete(ctx context.Context, key []byte) (err error) {
_, err = store.Client.Del(ctx, string(key)).Result()
if err != nil {
return fmt.Errorf("kv delete: %v", err)
}
return nil
}