diff --git a/config.yml b/config.yml index b7dc05f..84ea536 100644 --- a/config.yml +++ b/config.yml @@ -1,6 +1,7 @@ robotfs: address: "0.0.0.0:6201" log_path: "/var/log/robotfs" + data_path: "/data/dukai/juiceFSdata" redis: address: "180.76.110.177:6379" @@ -16,6 +17,3 @@ s3: secret_access_key: "12345678" bucket_name: "myjfs" use_path_style: true - -juicefs: - data_path: "/data/juicefs" diff --git a/config/config.go b/config/config.go index 27e062e..25177e0 100644 --- a/config/config.go +++ b/config/config.go @@ -1,7 +1,6 @@ package config import ( - "robotfs/utils" "time" "gosvc" @@ -10,9 +9,13 @@ import ( ) var ( - cfg = utils.GetViper() + cfg = getViper() ) +func GetConfig() Configuration { + return cfg +} + func SetupServiceConfig() { gosvc.Config = gosvc.ServiceConfig{ Name: "robotfs", diff --git a/utils/config.go b/config/global.go similarity index 97% rename from utils/config.go rename to config/global.go index bac534e..626e5e8 100644 --- a/utils/config.go +++ b/config/global.go @@ -1,4 +1,4 @@ -package utils +package config import ( "path/filepath" @@ -90,7 +90,7 @@ func (vp *ViperProxy) GetStringSlice(key string) []string { return vp.Viper.GetStringSlice(key) } -func GetViper() *ViperProxy { +func getViper() *ViperProxy { vp.Lock() defer vp.Unlock() diff --git a/config/robotfs.go b/config/robotfs.go deleted file mode 100644 index 6cc011c..0000000 --- a/config/robotfs.go +++ /dev/null @@ -1,41 +0,0 @@ -package config - -type RobotFSConfig struct { - Address string `mapstructure:"address"` - LogPath string `mapstructure:"log_path"` -} - -type RedisConfig struct { - Address string `mapstructure:"address"` - Password string `mapstructure:"password"` - DataBase int `mapstructure:"database"` - //Todo: use it in the future - SuperLargeDirectories []string `mapstructure:"superLargeDirectories"` -} - -type S3Config struct { - Region string `mapstructure:"region"` - Endpoint string `mapstructure:"endpoint"` - AccessKeyID string `mapstructure:"access_key_id"` - SecretAccessKey string `mapstructure:"secret_access_key"` - BucketName string `mapstructure:"bucket_name"` - UsePathStyle bool `mapstructure:"use_path_style"` -} - -type JuicefsConfig struct { - //Address string `mapstructure:"address"` - DataPath string `mapstructure:"data_path"` -} - -type Config struct { - RobotFS RobotFSConfig `mapstructure:"robotfs"` - Redis RedisConfig `mapstructure:"redis"` - S3 S3Config `mapstructure:"s3"` - Juicefs JuicefsConfig `mapstructure:"juicefs"` -} - -func GetConfig() *Config { - config := &Config{} - - return config -} diff --git a/engine/engine.go b/engine/engine.go index 4247ee7..19e0a48 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -3,8 +3,9 @@ package engine import ( "fmt" - "robotfs/store/redis_lua" - "robotfs/utils" + "robotfs/config" + "robotfs/pkg/meta/redis_lua" + "robotfs/pkg/storage" ) type Engine struct { @@ -16,25 +17,45 @@ func NewEngine() *Engine { } func (e *Engine) Start() error { - s3Config, err := Initialize(utils.GetViper(), "s3.") + cfg := config.GetConfig() + + meta, err := e.initMetaStore(cfg) if err != nil { - return fmt.Errorf("failed to initialize s3 config: %v", err) - } - object, err := NewStorageManager(s3Config) - if err != nil { - return fmt.Errorf("failed to create storage manager: %v", err) + return err } - meta := redis_lua.NewRedisLuaStore() - if err := meta.Initialize(utils.GetViper(), "redis."); err != nil { - return fmt.Errorf("failed to initialize meta store: %v", err) + store, err := e.initStorage(cfg) + if err != nil { + return err } - filesystem := NewFileSystemManager(meta, object, utils.GetViper().GetString("robotfs.data_path")) - e.FileSystemManager = filesystem + + e.FileSystemManager = NewFileSystemManager( + meta, + store, + cfg.GetString("robotfs.data_path"), + ) return nil } +func (e *Engine) initMetaStore(cfg config.Configuration) (*redis_lua.RedisLuaStore, error) { + meta := redis_lua.NewRedisLuaStore() + if err := meta.Initialize(cfg, "redis."); err != nil { + return nil, fmt.Errorf("initialize meta store failed: %v", err) + } + + return meta, nil +} + +func (e *Engine) initStorage(cfg config.Configuration) (*storage.Storage, error) { + store := storage.NewStorage() + if err := store.Initialize(cfg, "s3."); err != nil { + return nil, fmt.Errorf("initialize s3 config failed: %v", err) + } + + return store, nil +} + func (e *Engine) Stop() { // Stop filesystem manager. e.FileSystemManager.Shutdown() diff --git a/engine/filesystem_copy.go b/engine/filesystem_copy.go index 79a6e5c..c028914 100644 --- a/engine/filesystem_copy.go +++ b/engine/filesystem_copy.go @@ -113,7 +113,7 @@ func (f *FileSystemManager) copySelfEntry(ctx context.Context, oldPath utils.Ful IsDir: entry.IsDir, Size: entry.Size, CreateTime: time.Now().Unix(), - S3Key: entry.S3Key, + S3Key: newPath.ToS3Key(), ContentType: entry.ContentType, Etag: entry.Etag, VersionID: entry.VersionID, diff --git a/engine/filesystem_delete.go b/engine/filesystem_delete.go new file mode 100644 index 0000000..4225897 --- /dev/null +++ b/engine/filesystem_delete.go @@ -0,0 +1,108 @@ +package engine + +import ( + "context" + "fmt" + "gosvc/logger" + + "robotfs/utils" +) + +func (f *FileSystemManager) DeleteDirectory(ctx context.Context, path utils.FullPath, isRecursive, ignoreRecursiveError bool) error { + f.Lock() + defer f.Unlock() + + if string(path) == "/" { + return nil + } + + entry, err := f.FindEntry(ctx, path) + if err != nil { + return err + } + + if entry.IsDir { + err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError) + if err != nil { + return err + } + } + + // Delete the directory entry itself + err = f.doDeleteEntryMetaAndData(ctx, entry) + if err != nil { + return err + } + + if err := utils.Remove(f.root, string(path), isRecursive); err != nil { + return err + } + + return nil +} + +func (f *FileSystemManager) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *utils.Entry, isRecursive, ignoreRecursiveError bool) error { + logger.Info("deleting folder => %s", entry.FullPath) + + lastFileName := "" + includeLastFile := false + + for { + lastFileName, err := f.doListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, 1000, func(sub *utils.Entry) bool { + if sub.IsDir { + // Recursively delete subdirectory + err := f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError) + if err == nil { + err = f.doDeleteEntryMetaAndData(ctx, sub) + } + if err != nil && !ignoreRecursiveError { + return false + } + } else { + // Delete file entry + err := f.doDeleteEntryMetaAndData(ctx, sub) + if err != nil && !ignoreRecursiveError { + return false + } + } + return true + }) + if err != nil { + return fmt.Errorf("list folder %s: %v", entry.FullPath, err) + } + + if lastFileName == "" && !isRecursive { + // Check if there are any entries + hasEntries := false + _, err := f.doListDirectoryEntries(ctx, entry.FullPath, "", false, 1, func(_ *utils.Entry) bool { + hasEntries = true + return false + }) + if err != nil { + return fmt.Errorf("check directory empty: %v", err) + } + if hasEntries { + return fmt.Errorf("directory not empty: %s", entry.FullPath) + } + } + + if lastFileName == "" { + break + } + } + + if storeDeletionErr := f.meta.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil { + return fmt.Errorf("delete folder children metadata: %v", storeDeletionErr) + } + + return nil +} + +func (f *FileSystemManager) doDeleteEntryMetaAndData(ctx context.Context, entry *utils.Entry) error { + logger.Info("deleting entry => %s", entry.FullPath) + if storeDeletionErr := f.meta.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil { + return fmt.Errorf("delete entry metadata: %v", storeDeletionErr) + } + + return nil +} diff --git a/engine/filesystem_manager.go b/engine/filesystem_manager.go index 93b873a..c1ff914 100644 --- a/engine/filesystem_manager.go +++ b/engine/filesystem_manager.go @@ -7,22 +7,23 @@ import ( "sync" "time" - "robotfs/store" + "robotfs/pkg/meta" + "robotfs/pkg/storage" "robotfs/utils" ) type FileSystemManager struct { sync.RWMutex - meta store.MetaStore - storage *StorageManager - root string + root string + meta meta.MetaStore + store *storage.Storage } -func NewFileSystemManager(meta store.MetaStore, storage *StorageManager, root string) *FileSystemManager { +func NewFileSystemManager(meta meta.MetaStore, store *storage.Storage, root string) *FileSystemManager { return &FileSystemManager{ - meta: meta, - storage: storage, - root: root, + root: root, + meta: meta, + store: store, } } @@ -79,106 +80,14 @@ func (f *FileSystemManager) MakeDirectory(ctx context.Context, path utils.FullPa } } + if err := utils.Mkdir(f.root, string(path)); err != nil { + return err + } + entry := utils.NewDirEntry(path) return f.meta.InsertEntry(ctx, entry) } -func (f *FileSystemManager) DeleteDirectory(ctx context.Context, path utils.FullPath, isRecursive, ignoreRecursiveError bool) error { - f.Lock() - defer f.Unlock() - - if string(path) == "/" { - return nil - } - - entry, err := f.FindEntry(ctx, path) - if err != nil { - return err - } - - if entry.IsDir { - err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError) - if err != nil { - return err - } - } - - // Delete the directory entry itself - err = f.doDeleteEntryMetaAndData(ctx, entry) - if err != nil { - return err - } - - if err := utils.Remove(f.root, string(path), isRecursive); err != nil { - return err - } - - return nil -} - -func (f *FileSystemManager) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *utils.Entry, isRecursive, ignoreRecursiveError bool) error { - lastFileName := "" - includeLastFile := false - - for { - lastFileName, err := f.doListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, 1000, func(sub *utils.Entry) bool { - if sub.IsDir { - // Recursively delete subdirectory - err := f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError) - if err == nil { - err = f.doDeleteEntryMetaAndData(ctx, sub) - } - if err != nil && !ignoreRecursiveError { - return false - } - } else { - // Delete file entry - err := f.doDeleteEntryMetaAndData(ctx, sub) - if err != nil && !ignoreRecursiveError { - return false - } - } - return true - }) - if err != nil { - return fmt.Errorf("list folder %s: %v", entry.FullPath, err) - } - - if lastFileName == "" && !isRecursive { - // Check if there are any entries - hasEntries := false - _, err := f.doListDirectoryEntries(ctx, entry.FullPath, "", false, 1, func(_ *utils.Entry) bool { - hasEntries = true - return false - }) - if err != nil { - return fmt.Errorf("check directory empty: %v", err) - } - if hasEntries { - return fmt.Errorf("directory not empty: %s", entry.FullPath) - } - } - - if lastFileName == "" { - break - } - } - - if storeDeletionErr := f.meta.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil { - return fmt.Errorf("delete folder children metadata: %v", storeDeletionErr) - } - - return nil -} - -func (f *FileSystemManager) doDeleteEntryMetaAndData(ctx context.Context, entry *utils.Entry) error { - if storeDeletionErr := f.meta.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil { - return fmt.Errorf("delete entry metadata: %v", storeDeletionErr) - } - - return nil -} - func (f *FileSystemManager) ListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64) (entries []*utils.Entry, hasMore bool, err error) { f.RLock() defer f.RUnlock() @@ -196,13 +105,13 @@ func (f *FileSystemManager) ListDirectoryEntries(ctx context.Context, p utils.Fu return entries, hasMore, err } -func (f *FileSystemManager) StreamListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc store.ListEachEntryFunc) (lastFileName string, err error) { +func (f *FileSystemManager) StreamListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) { lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, eachEntryFunc) return } -func (f *FileSystemManager) doListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc store.ListEachEntryFunc) (lastFileName string, err error) { +func (f *FileSystemManager) doListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) { lastFileName, err = f.meta.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool { select { case <-ctx.Done(): @@ -233,9 +142,9 @@ func (f *FileSystemManager) CreateFile(ctx context.Context, path utils.FullPath, } } - output, err := f.storage.UploadFile(path.ToS3Key(), reader, contentType) + output, err := f.store.UploadFile(path.ToS3Key(), reader, contentType) if err != nil { - return fmt.Errorf("upload s3 failed: %v", err) + return fmt.Errorf("upload file failed: %v", err) } if output.ETag == nil { @@ -269,9 +178,9 @@ func (f *FileSystemManager) DownloadFile(ctx context.Context, path utils.FullPat return nil, nil, fmt.Errorf("cannot download directory") } - downloader, err := f.storage.DownloadFile(entry.S3Key) + downloader, err := f.store.DownloadFile(entry.S3Key) if err != nil { - return nil, nil, fmt.Errorf("read s3 object failed: %v", err) + return nil, nil, fmt.Errorf("download file failed: %v", err) } return downloader, entry, nil @@ -291,8 +200,8 @@ func (f *FileSystemManager) DeleteFile(ctx context.Context, path utils.FullPath, } if !isDir { - if err := f.storage.DeleteObject(entry.S3Key); err != nil { - return fmt.Errorf("delete s3 object failed: %v", err) + if err := f.store.DeleteObject(entry.S3Key); err != nil { + return fmt.Errorf("delete file failed: %v", err) } } @@ -313,11 +222,11 @@ func (f *FileSystemManager) RenameFile(ctx context.Context, srcPath, dstPath uti srcEntry, err := f.FindEntry(ctx, srcPath) if err != nil { - return fmt.Errorf("find src file failed: %v", err) + return fmt.Errorf("find src entry failed: %v", err) } if dstEntry, _ := f.FindEntry(ctx, dstPath); dstEntry != nil { - return fmt.Errorf("dst file %s already exists", dstPath) + return fmt.Errorf("dst entry %s already exists", dstPath) } parentDir, _ := dstPath.DirAndName() @@ -337,12 +246,12 @@ func (f *FileSystemManager) RenameFile(ctx context.Context, srcPath, dstPath uti newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID) - if err := f.meta.InsertEntry(ctx, newEntry); err != nil { - return fmt.Errorf("insert new entry failed: %v", err) + if err := f.meta.UpdateEntry(ctx, newEntry); err != nil { + return fmt.Errorf("update entry failed: %v", err) } if err := f.meta.DeleteEntry(ctx, srcPath); err != nil { - return fmt.Errorf("delete source entry failed: %v", err) + return fmt.Errorf("delete src entry failed: %v", err) } return nil @@ -358,11 +267,11 @@ func (f *FileSystemManager) CopyFile(ctx context.Context, srcPath, dstPath utils srcEntry, err := f.FindEntry(ctx, srcPath) if err != nil { - return fmt.Errorf("find src file failed: %v", err) + return fmt.Errorf("find src entry failed: %v", err) } if dstEntry, err := f.FindEntry(ctx, dstPath); dstEntry != nil && err == nil { - return fmt.Errorf("dst file %s already exists", dstPath) + return fmt.Errorf("dst entry %s already exists", dstPath) } parentDir, _ := dstPath.DirAndName() diff --git a/engine/filesystem_rename.go b/engine/filesystem_rename.go index 996a0b1..26dfd08 100644 --- a/engine/filesystem_rename.go +++ b/engine/filesystem_rename.go @@ -111,7 +111,7 @@ func (f *FileSystemManager) moveSelfEntry(ctx context.Context, oldPath utils.Ful IsDir: entry.IsDir, Size: entry.Size, CreateTime: time.Now().Unix(), - S3Key: entry.S3Key, + S3Key: newPath.ToS3Key(), ContentType: entry.ContentType, Etag: entry.Etag, VersionID: entry.VersionID, @@ -119,8 +119,8 @@ func (f *FileSystemManager) moveSelfEntry(ctx context.Context, oldPath utils.Ful Extended: entry.Extended, } - if createErr := f.meta.InsertEntry(ctx, newEntry); createErr != nil { - return createErr + if updateErr := f.meta.UpdateEntry(ctx, newEntry); updateErr != nil { + return updateErr } if moveFolderSubEntries != nil { diff --git a/main.go b/main.go index 08287a9..f881eec 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,6 @@ import ( "os" "robotfs/config" - "robotfs/utils" ) const usage = `Usage: robotfs [options] @@ -44,7 +43,7 @@ func main() { } if *configFile != "" { - err := utils.LoadConfiguration(*configFile) + err := config.LoadConfiguration(*configFile) if err != nil { fmt.Println("load config file failed, " + err.Error()) os.Exit(1) diff --git a/pkg/meta/meta_store.go b/pkg/meta/meta_store.go new file mode 100644 index 0000000..01045f9 --- /dev/null +++ b/pkg/meta/meta_store.go @@ -0,0 +1,38 @@ +package meta + +import ( + "context" + "errors" + + "robotfs/config" + "robotfs/utils" +) + +var ( + ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing") + ErrKvNotImplemented = errors.New("kv not implemented yet") + ErrKvNotFound = errors.New("kv: not found") +) + +type ListEachEntryFunc func(entry *utils.Entry) bool + +type MetaStore interface { + GetName() string + + Initialize(configuration config.Configuration, prefix string) error + + InsertEntry(context.Context, *utils.Entry) error + UpdateEntry(context.Context, *utils.Entry) (err error) + FindEntry(context.Context, utils.FullPath) (entry *utils.Entry, err error) + DeleteEntry(context.Context, utils.FullPath) (err error) + DeleteFolderChildren(context.Context, utils.FullPath) (err error) + ListDirectoryEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) + // Todo: implement this in the future + ListDirectoryPrefixedEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) + + BeginTransaction(ctx context.Context) (context.Context, error) + CommitTransaction(ctx context.Context) error + RollbackTransaction(ctx context.Context) error + + Shutdown() +} diff --git a/pkg/meta/redis_lua/redis_store.go b/pkg/meta/redis_lua/redis_store.go new file mode 100644 index 0000000..39b9670 --- /dev/null +++ b/pkg/meta/redis_lua/redis_store.go @@ -0,0 +1,40 @@ +package redis_lua + +import ( + "robotfs/config" + + "github.com/redis/go-redis/v9" +) + +type RedisLuaStore struct { + UniversalRedisLuaStore +} + +func NewRedisLuaStore() *RedisLuaStore { + return &RedisLuaStore{} +} + +func (store *RedisLuaStore) GetName() string { + return "redis_lua" +} + +func (store *RedisLuaStore) Initialize(configuration config.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"address"), + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), + configuration.GetStringSlice(prefix+"superLargeDirectories"), + ) +} + +func (store *RedisLuaStore) initialize(hostPort string, username, password string, database int, superLargeDirectories []string) (err error) { + store.Client = redis.NewClient(&redis.Options{ + Addr: hostPort, + Username: username, + Password: password, + DB: database, + }) + store.loadSuperLargeDirectories(superLargeDirectories) + return +} diff --git a/pkg/meta/redis_lua/stored_procedure/delete_entry.lua b/pkg/meta/redis_lua/stored_procedure/delete_entry.lua new file mode 100644 index 0000000..9730937 --- /dev/null +++ b/pkg/meta/redis_lua/stored_procedure/delete_entry.lua @@ -0,0 +1,19 @@ +-- KEYS[1]: full path of entry +local fullpath = KEYS[1] +-- KEYS[2]: full path of entry +local fullpath_list_key = KEYS[2] +-- KEYS[3]: dir of the entry +local dir_list_key = KEYS[3] + +-- ARGV[1]: isSuperLargeDirectory +local isSuperLargeDirectory = ARGV[1] == "1" +-- ARGV[2]: name of the entry +local name = ARGV[2] + +redis.call("DEL", fullpath, fullpath_list_key) + +if not isSuperLargeDirectory and name ~= "" then + redis.call("ZREM", dir_list_key, name) +end + +return 0 diff --git a/pkg/meta/redis_lua/stored_procedure/delete_folder_children.lua b/pkg/meta/redis_lua/stored_procedure/delete_folder_children.lua new file mode 100644 index 0000000..18ea74d --- /dev/null +++ b/pkg/meta/redis_lua/stored_procedure/delete_folder_children.lua @@ -0,0 +1,15 @@ +-- KEYS[1]: full path of entry +local fullpath = KEYS[1] + +if fullpath ~= "" and string.sub(fullpath, -1) == "/" then + fullpath = string.sub(fullpath, 0, -2) +end + +local files = redis.call("ZRANGE", fullpath .. "\0", "0", "-1") + +for _, name in ipairs(files) do + local file_path = fullpath .. "/" .. name + redis.call("DEL", file_path, file_path .. "\0") +end + +return 0 diff --git a/pkg/meta/redis_lua/stored_procedure/init.go b/pkg/meta/redis_lua/stored_procedure/init.go new file mode 100644 index 0000000..9373cc5 --- /dev/null +++ b/pkg/meta/redis_lua/stored_procedure/init.go @@ -0,0 +1,24 @@ +package stored_procedure + +import ( + _ "embed" + "github.com/redis/go-redis/v9" +) + +func init() { + InsertEntryScript = redis.NewScript(insertEntry) + DeleteEntryScript = redis.NewScript(deleteEntry) + DeleteFolderChildrenScript = redis.NewScript(deleteFolderChildren) +} + +//go:embed insert_entry.lua +var insertEntry string +var InsertEntryScript *redis.Script + +//go:embed delete_entry.lua +var deleteEntry string +var DeleteEntryScript *redis.Script + +//go:embed delete_folder_children.lua +var deleteFolderChildren string +var DeleteFolderChildrenScript *redis.Script diff --git a/pkg/meta/redis_lua/stored_procedure/insert_entry.lua b/pkg/meta/redis_lua/stored_procedure/insert_entry.lua new file mode 100644 index 0000000..a990aff --- /dev/null +++ b/pkg/meta/redis_lua/stored_procedure/insert_entry.lua @@ -0,0 +1,27 @@ +-- KEYS[1]: full path of entry +local full_path = KEYS[1] +-- KEYS[2]: dir of the entry +local dir_list_key = KEYS[2] + +-- ARGV[1]: content of the entry +local entry = ARGV[1] +-- ARGV[2]: TTL of the entry +local ttlSec = tonumber(ARGV[2]) +-- ARGV[3]: isSuperLargeDirectory +local isSuperLargeDirectory = ARGV[3] == "1" +-- ARGV[4]: zscore of the entry in zset +local zscore = tonumber(ARGV[4]) +-- ARGV[5]: name of the entry +local name = ARGV[5] + +if ttlSec > 0 then + redis.call("SET", full_path, entry, "EX", ttlSec) +else + redis.call("SET", full_path, entry) +end + +if not isSuperLargeDirectory and name ~= "" then + redis.call("ZADD", dir_list_key, "NX", zscore, name) +end + +return 0 diff --git a/pkg/meta/redis_lua/universal_redis_store.go b/pkg/meta/redis_lua/universal_redis_store.go new file mode 100644 index 0000000..15be017 --- /dev/null +++ b/pkg/meta/redis_lua/universal_redis_store.go @@ -0,0 +1,180 @@ +package redis_lua + +import ( + "context" + "errors" + "fmt" + + "robotfs/pkg/meta" + "robotfs/pkg/meta/redis_lua/stored_procedure" + "robotfs/utils" + + "github.com/redis/go-redis/v9" +) + +const ( + DIR_LIST_MARKER = "\x00" +) + +var ErrNotFound = errors.New("no entry is found") + +type UniversalRedisLuaStore struct { + Client redis.UniversalClient + superLargeDirectoryHash map[string]bool +} + +func (store *UniversalRedisLuaStore) isSuperLargeDirectory(dir string) (isSuperLargeDirectory bool) { + _, isSuperLargeDirectory = store.superLargeDirectoryHash[dir] + return +} + +func (store *UniversalRedisLuaStore) loadSuperLargeDirectories(superLargeDirectories []string) { + store.superLargeDirectoryHash = make(map[string]bool) + for _, dir := range superLargeDirectories { + store.superLargeDirectoryHash[dir] = true + } +} + +func (store *UniversalRedisLuaStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *UniversalRedisLuaStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *UniversalRedisLuaStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *utils.Entry) (err error) { + + value, err := entry.Encode() + if err != nil { + return fmt.Errorf("encoding %s: %v", entry.FullPath, err) + } + + dir, name := entry.FullPath.DirAndName() + + err = stored_procedure.InsertEntryScript.Run(ctx, store.Client, + []string{string(entry.FullPath), genDirectoryListKey(dir)}, + value, 0, + store.isSuperLargeDirectory(dir), 0, name).Err() + + if err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + + return nil +} + +func (store *UniversalRedisLuaStore) UpdateEntry(ctx context.Context, entry *utils.Entry) (err error) { + + return store.InsertEntry(ctx, entry) +} + +func (store *UniversalRedisLuaStore) FindEntry(ctx context.Context, fullpath utils.FullPath) (entry *utils.Entry, err error) { + + data, err := store.Client.Get(ctx, string(fullpath)).Result() + if err == redis.Nil { + return nil, ErrNotFound + } + + if err != nil { + return nil, fmt.Errorf("get %s : %v", fullpath, err) + } + + entry = &utils.Entry{ + FullPath: fullpath, + } + err = entry.Decode([]byte(data)) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *UniversalRedisLuaStore) DeleteEntry(ctx context.Context, fullpath utils.FullPath) (err error) { + + dir, name := fullpath.DirAndName() + + err = stored_procedure.DeleteEntryScript.Run(ctx, store.Client, + []string{string(fullpath), genDirectoryListKey(string(fullpath)), genDirectoryListKey(dir)}, + store.isSuperLargeDirectory(dir), name).Err() + + if err != nil { + return fmt.Errorf("DeleteEntry %s : %v", fullpath, err) + } + + return nil +} + +func (store *UniversalRedisLuaStore) DeleteFolderChildren(ctx context.Context, fullpath utils.FullPath) (err error) { + + if store.isSuperLargeDirectory(string(fullpath)) { + return nil + } + + err = stored_procedure.DeleteFolderChildrenScript.Run(ctx, store.Client, + []string{string(fullpath)}).Err() + + if err != nil { + return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err) + } + + return nil +} + +// Todo: implement this in the future +func (store *UniversalRedisLuaStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, meta.ErrUnsupportedListDirectoryPrefixed +} + +func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) { + + dirListKey := genDirectoryListKey(string(dirPath)) + + min := "-" + if startFileName != "" { + if includeStartFile { + min = "[" + startFileName + } else { + min = "(" + startFileName + } + } + + members, err := store.Client.ZRangeByLex(ctx, dirListKey, &redis.ZRangeBy{ + Min: min, + Max: "+", + Offset: 0, + Count: limit, + }).Result() + if err != nil { + return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) + } + + // fetch entry meta + for _, fileName := range members { + path := utils.NewFullPath(string(dirPath), fileName) + entry, err := store.FindEntry(ctx, path) + lastFileName = fileName + if err != nil { + if err == ErrNotFound { + continue + } + } else { + if !eachEntryFunc(entry) { + break + } + } + } + + return lastFileName, err +} + +func genDirectoryListKey(dir string) (dirList string) { + return dir + DIR_LIST_MARKER +} + +func (store *UniversalRedisLuaStore) Shutdown() { + store.Client.Close() +} diff --git a/pkg/meta/redis_lua/universal_redis_store_kv.go b/pkg/meta/redis_lua/universal_redis_store_kv.go new file mode 100644 index 0000000..9bb59c2 --- /dev/null +++ b/pkg/meta/redis_lua/universal_redis_store_kv.go @@ -0,0 +1,41 @@ +package redis_lua + +import ( + "context" + "fmt" + + "github.com/redis/go-redis/v9" +) + +func (store *UniversalRedisLuaStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + + _, err = store.Client.Set(ctx, string(key), value, 0).Result() + + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + + return nil +} + +func (store *UniversalRedisLuaStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + + data, err := store.Client.Get(ctx, string(key)).Result() + + if err == redis.Nil { + return nil, fmt.Errorf("kv: not found") + } + + return []byte(data), err +} + +func (store *UniversalRedisLuaStore) KvDelete(ctx context.Context, key []byte) (err error) { + + _, err = store.Client.Del(ctx, string(key)).Result() + + if err != nil { + return fmt.Errorf("kv delete: %v", err) + } + + return nil +} diff --git a/engine/storage_manager.go b/pkg/storage/storage.go similarity index 52% rename from engine/storage_manager.go rename to pkg/storage/storage.go index 7ceb5a7..c8f11c6 100644 --- a/engine/storage_manager.go +++ b/pkg/storage/storage.go @@ -1,10 +1,11 @@ -package engine +package storage import ( "bytes" "fmt" "io" + "robotfs/config" "robotfs/utils" "github.com/aws/aws-sdk-go/aws" @@ -14,43 +15,26 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" ) -type S3Config struct { - Region string - Endpoint string - AccessKeyID string - SecretAccessKey string - BucketName string - UsePathStyle bool -} - -func Initialize(configuration utils.Configuration, prefix string) (*S3Config, error) { - s3Cfg := &S3Config{ - Region: configuration.GetString(prefix + "region"), - Endpoint: configuration.GetString(prefix + "endpoint"), - AccessKeyID: configuration.GetString(prefix + "access_key_id"), - SecretAccessKey: configuration.GetString(prefix + "secret_access_key"), - BucketName: configuration.GetString(prefix + "bucket_name"), - UsePathStyle: configuration.GetBool(prefix + "use_path_style"), - } - - if s3Cfg.BucketName == "" { - return nil, fmt.Errorf("bucket_name is required") - } - - return s3Cfg, nil -} - -type StorageManager struct { +type Storage struct { s3Client *s3.S3 s3Uploader *s3manager.Uploader s3Downloader *s3manager.Downloader bucketName string } -func NewStorageManager(s3Config *S3Config) (*StorageManager, error) { +func NewStorage() *Storage { + return &Storage{} +} + +func (s *Storage) Initialize(cfg config.Configuration, prefix string) error { + s3Config, err := initialize(cfg, prefix) + if err != nil { + return err + } + s3Config, awsSession, err := createS3Session(s3Config) if err != nil { - return nil, err + return err } s3Client := s3.New(awsSession) @@ -63,12 +47,38 @@ func NewStorageManager(s3Config *S3Config) (*StorageManager, error) { }) downloader := s3manager.NewDownloader(awsSession) - return &StorageManager{ - s3Client: s3Client, - s3Uploader: uploader, - s3Downloader: downloader, - bucketName: s3Config.BucketName, - }, nil + s.s3Client = s3Client + s.s3Uploader = uploader + s.s3Downloader = downloader + s.bucketName = s3Config.BucketName + + return nil +} + +type S3Config struct { + Region string + Endpoint string + AccessKeyID string + SecretAccessKey string + BucketName string + UsePathStyle bool +} + +func initialize(cfg config.Configuration, prefix string) (*S3Config, error) { + s3Cfg := &S3Config{ + Region: cfg.GetString(prefix + "region"), + Endpoint: cfg.GetString(prefix + "endpoint"), + AccessKeyID: cfg.GetString(prefix + "access_key_id"), + SecretAccessKey: cfg.GetString(prefix + "secret_access_key"), + BucketName: cfg.GetString(prefix + "bucket_name"), + UsePathStyle: cfg.GetBool(prefix + "use_path_style"), + } + + if s3Cfg.BucketName == "" { + return nil, fmt.Errorf("bucket_name is required") + } + + return s3Cfg, nil } func createS3Session(config *S3Config) (*S3Config, *session.Session, error) { @@ -90,13 +100,13 @@ func createS3Session(config *S3Config) (*S3Config, *session.Session, error) { return config, sess, nil } -func (sm *StorageManager) UploadData(s3Key string, data []byte, contentType string) (*s3manager.UploadOutput, error) { +func (s *Storage) UploadData(s3Key string, data []byte, contentType string) (*s3manager.UploadOutput, error) { if contentType == "" { contentType = "application/octet-stream" } - output, err := sm.s3Uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(sm.bucketName), + output, err := s.s3Uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(s.bucketName), Key: aws.String(s3Key), Body: bytes.NewReader(data), ContentType: aws.String(contentType), @@ -108,13 +118,13 @@ func (sm *StorageManager) UploadData(s3Key string, data []byte, contentType stri return output, nil } -func (sm *StorageManager) UploadFile(s3Key string, data io.Reader, contentType string) (*s3manager.UploadOutput, error) { +func (s *Storage) UploadFile(s3Key string, data io.Reader, contentType string) (*s3manager.UploadOutput, error) { if contentType == "" { contentType = "application/octet-stream" } - output, err := sm.s3Uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(sm.bucketName), + output, err := s.s3Uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(s.bucketName), Key: aws.String(s3Key), Body: data, ContentType: aws.String(contentType), @@ -126,31 +136,31 @@ func (sm *StorageManager) UploadFile(s3Key string, data io.Reader, contentType s return output, nil } -func (sm *StorageManager) DownloadFile(S3Key string) (*utils.S3ReadSeeker, error) { - head, err := sm.s3Client.HeadObject(&s3.HeadObjectInput{ - Bucket: aws.String(sm.bucketName), +func (s *Storage) DownloadFile(S3Key string) (*utils.S3ReadSeeker, error) { + head, err := s.s3Client.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(s.bucketName), Key: aws.String(S3Key), }) if err != nil { return nil, err } - result, err := sm.s3Client.GetObject(&s3.GetObjectInput{ - Bucket: aws.String(sm.bucketName), + result, err := s.s3Client.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(s.bucketName), Key: aws.String(S3Key), }) if err != nil { return nil, err } - return utils.NewS3ReadSeeker(sm.s3Client, sm.bucketName, S3Key, result.Body, *head.ContentLength), nil + return utils.NewS3ReadSeeker(s.s3Client, s.bucketName, S3Key, result.Body, *head.ContentLength), nil } -func (sm *StorageManager) ReadObject(S3Key string) ([]byte, error) { +func (s *Storage) ReadObject(S3Key string) ([]byte, error) { buf := aws.NewWriteAtBuffer([]byte{}) - _, err := sm.s3Downloader.Download(buf, &s3.GetObjectInput{ - Bucket: aws.String(sm.bucketName), + _, err := s.s3Downloader.Download(buf, &s3.GetObjectInput{ + Bucket: aws.String(s.bucketName), Key: aws.String(S3Key), }) if err != nil { @@ -160,33 +170,33 @@ func (sm *StorageManager) ReadObject(S3Key string) ([]byte, error) { return buf.Bytes(), nil } -func (sm *StorageManager) DeleteObject(s3Key string) error { - _, err := sm.s3Client.DeleteObject(&s3.DeleteObjectInput{ - Bucket: aws.String(sm.bucketName), +func (s *Storage) DeleteObject(s3Key string) error { + _, err := s.s3Client.DeleteObject(&s3.DeleteObjectInput{ + Bucket: aws.String(s.bucketName), Key: aws.String(s3Key), }) if err != nil { return err } - return sm.s3Client.WaitUntilObjectNotExists(&s3.HeadObjectInput{ - Bucket: aws.String(sm.bucketName), + return s.s3Client.WaitUntilObjectNotExists(&s3.HeadObjectInput{ + Bucket: aws.String(s.bucketName), Key: aws.String(s3Key), }) } -func (sm *StorageManager) CopyObject(oldS3key, newS3Key string) error { - _, err := sm.s3Client.CopyObject(&s3.CopyObjectInput{ - Bucket: aws.String(sm.bucketName), - CopySource: aws.String(fmt.Sprintf("%s/%s", sm.bucketName, oldS3key)), +func (s *Storage) CopyObject(oldS3key, newS3Key string) error { + _, err := s.s3Client.CopyObject(&s3.CopyObjectInput{ + Bucket: aws.String(s.bucketName), + CopySource: aws.String(fmt.Sprintf("%s/%s", s.bucketName, oldS3key)), Key: aws.String(newS3Key), }) if err != nil { return err } - return sm.s3Client.WaitUntilObjectExists(&s3.HeadObjectInput{ - Bucket: aws.String(sm.bucketName), + return s.s3Client.WaitUntilObjectExists(&s3.HeadObjectInput{ + Bucket: aws.String(s.bucketName), Key: aws.String(newS3Key), }) } diff --git a/service.go b/service.go index e51e02b..9b04623 100644 --- a/service.go +++ b/service.go @@ -19,9 +19,13 @@ func CreateService() (*Service, error) { }, nil } +func (s *Service) LoadMetadata() error { + return nil +} + func (s *Service) Start() error { if err := s.Engine.Start(); err != nil { - return fmt.Errorf("failed to start engine: %v", err) + return fmt.Errorf("start engine failed: %v", err) } return nil diff --git a/utils/operation.go b/utils/operation.go index 62dd470..6aee3ee 100644 --- a/utils/operation.go +++ b/utils/operation.go @@ -39,3 +39,13 @@ func Remove(root, path string, isDir bool) error { return nil } + +func Mkdir(root, path string) error { + cmd := exec.Command("mkdir", root+path) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("mkdir failed: %v, output: %s", err, string(output)) + } + + return nil +}