From 2fd5a0e4b81120f86b578a8e5451802feda79887 Mon Sep 17 00:00:00 2001 From: dukai Date: Fri, 6 Jun 2025 15:08:41 +0800 Subject: [PATCH] Update: update redis configuration --- api_directory.go | 10 +- api_file.go | 12 +- config/config.go | 2 +- engine/engine.go | 7 +- engine/filesystem_manager.go | 318 ------------------------ pkg/fs/filesystem.go | 318 ++++++++++++++++++++++++ {engine => pkg/fs}/filesystem_copy.go | 34 +-- {engine => pkg/fs}/filesystem_delete.go | 34 +-- {engine => pkg/fs}/filesystem_rename.go | 36 +-- pkg/meta/redis_lua/redis_store.go | 10 +- 10 files changed, 392 insertions(+), 389 deletions(-) delete mode 100644 engine/filesystem_manager.go create mode 100644 pkg/fs/filesystem.go rename {engine => pkg/fs}/filesystem_copy.go (60%) rename {engine => pkg/fs}/filesystem_delete.go (51%) rename {engine => pkg/fs}/filesystem_rename.go (59%) diff --git a/api_directory.go b/api_directory.go index 2467593..183847e 100644 --- a/api_directory.go +++ b/api_directory.go @@ -44,7 +44,7 @@ func (s *Service) HandleMkdir( path := params.Path newPath := utils.NormalizePath(path) - err := s.FileSystemManager.MakeDirectory(req.Context(), utils.FullPath(newPath)) + err := s.FileSystem.MakeDirectory(req.Context(), utils.FullPath(newPath)) if err != nil { logger.Error("mkdir %s: %v", path, err) out := entity.NewError[any]( @@ -70,7 +70,7 @@ func (s *Service) HandleListDirectory( limit := req.QueryInt("limit") newPath := utils.NormalizePath(path) - rawEntries, moreAvailable, lastFileName, err := s.FileSystemManager.ListDirectoryEntries( + rawEntries, moreAvailable, lastFileName, err := s.FileSystem.ListDirectoryEntries( context.Background(), utils.FullPath(newPath), startFileName, @@ -134,7 +134,7 @@ func (s *Service) HandleDeleteDirectory( return resp.OK(out).JSON() } - if err := s.FileSystemManager.DeleteDirectory(req.Context(), utils.FullPath(newPath), isDir, false); err != nil { + if err := s.FileSystem.DeleteDirectory(req.Context(), utils.FullPath(newPath), isDir, false); err != nil { logger.Error("del dir %s: %v", path, err) out := entity.NewError[any]( entity.CodeInternalServer, @@ -160,7 +160,7 @@ func (s *Service) HandleRenameDirectory( oldPath := utils.FullPath(srcPath) newPath := utils.FullPath(dstPath) - if err := s.FileSystemManager.RenameDirectory(req.Context(), oldPath, newPath, isDir); err != nil { + if err := s.FileSystem.RenameDirectory(req.Context(), oldPath, newPath, isDir); err != nil { logger.Error("move dir %s => %s: %v", params.SrcPath, params.DstPath, err) out := entity.NewError[any]( entity.CodeInternalServer, @@ -186,7 +186,7 @@ func (s *Service) HandleCopyDirectory( oldPath := utils.FullPath(srcPath) newPath := utils.FullPath(dstPath) - if err := s.FileSystemManager.CopyDirectory(req.Context(), oldPath, newPath, isDir); err != nil { + if err := s.FileSystem.CopyDirectory(req.Context(), oldPath, newPath, isDir); err != nil { logger.Error("copy dir %s => %s: %v", params.SrcPath, params.DstPath, err) out := entity.NewError[any]( entity.CodeInternalServer, diff --git a/api_file.go b/api_file.go index 87f5561..884e24d 100644 --- a/api_file.go +++ b/api_file.go @@ -63,7 +63,7 @@ func (s *Service) HandleUploadFile( contentType := fileHeader.Header.Get("Content-Type") fileSize := fileHeader.Size - if err := s.Engine.FileSystemManager.CreateFile(req.Context(), utils.FullPath(newPath), file, contentType, fileSize); err != nil { + if err := s.FileSystem.CreateFile(req.Context(), utils.FullPath(newPath), file, contentType, fileSize); err != nil { logger.Error("create %s: %v", path, err) out := entity.NewError[any]( entity.CodeInternalServer, @@ -85,7 +85,7 @@ func (s *Service) HandleInfoFile( path := req.QueryString("path") newPath := utils.NormalizePath(path) - entry, err := s.FileSystemManager.FindEntry(req.Context(), utils.FullPath(newPath)) + entry, err := s.FileSystem.FindEntry(req.Context(), utils.FullPath(newPath)) if err != nil { logger.Error("info %s: %v", path, err) out := entity.NewError[any]( @@ -109,7 +109,7 @@ func (s *Service) HandleDownloadFile( preview := req.QueryBool("preview") newPath := utils.NormalizePath(path) - downloader, entry, err := s.FileSystemManager.DownloadFile(req.Context(), utils.FullPath(newPath)) + downloader, entry, err := s.FileSystem.DownloadFile(req.Context(), utils.FullPath(newPath)) if err != nil { logger.Error("download %s: %v", path, err) out := entity.NewError[any]( @@ -139,7 +139,7 @@ func (s *Service) HandleDeleteFile( isDir := params.IsDir newPath := utils.NormalizePath(path) - err := s.FileSystemManager.DeleteFile(req.Context(), utils.FullPath(newPath), isDir) + err := s.FileSystem.DeleteFile(req.Context(), utils.FullPath(newPath), isDir) if err != nil { logger.Error("delete file %s: %v", path, err) out := entity.NewError[any]( @@ -164,7 +164,7 @@ func (s *Service) HandleRenameFile( srcPath := utils.FullPath(utils.NormalizePath(params.SrcPath)) dstPath := utils.FullPath(utils.NormalizePath(params.DstPath)) - err := s.FileSystemManager.RenameFile(req.Context(), srcPath, dstPath, isDir) + err := s.FileSystem.RenameFile(req.Context(), srcPath, dstPath, isDir) if err != nil { logger.Error("move file %s => %s: %v", srcPath, dstPath, err) out := entity.NewError[any]( @@ -189,7 +189,7 @@ func (s *Service) HandleCopyFile( srcPath := utils.FullPath(utils.NormalizePath(params.SrcPath)) dstPath := utils.FullPath(utils.NormalizePath(params.DstPath)) - err := s.FileSystemManager.CopyFile(req.Context(), srcPath, dstPath, isDir) + err := s.FileSystem.CopyFile(req.Context(), srcPath, dstPath, isDir) if err != nil { logger.Error("copy file %s => %s: %v", srcPath, dstPath, err) out := entity.NewError[any]( diff --git a/config/config.go b/config/config.go index 25177e0..7f62a0e 100644 --- a/config/config.go +++ b/config/config.go @@ -51,7 +51,7 @@ func SetupServiceConfig() { MaxConcurrentStreams: 10000, MaxTraces: 30 * 10000, LogSlowRequest: true, - SlowRequestThreshold: 1000 * time.Millisecond, + SlowRequestThreshold: 5000 * time.Millisecond, }, }, } diff --git a/engine/engine.go b/engine/engine.go index 19e0a48..24bf90d 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -4,12 +4,13 @@ import ( "fmt" "robotfs/config" + "robotfs/pkg/fs" "robotfs/pkg/meta/redis_lua" "robotfs/pkg/storage" ) type Engine struct { - FileSystemManager *FileSystemManager + FileSystem *fs.FileSystem } func NewEngine() *Engine { @@ -29,7 +30,7 @@ func (e *Engine) Start() error { return err } - e.FileSystemManager = NewFileSystemManager( + e.FileSystem = fs.NewFileSystem( meta, store, cfg.GetString("robotfs.data_path"), @@ -58,5 +59,5 @@ func (e *Engine) initStorage(cfg config.Configuration) (*storage.Storage, error) func (e *Engine) Stop() { // Stop filesystem manager. - e.FileSystemManager.Shutdown() + e.FileSystem.Shutdown() } diff --git a/engine/filesystem_manager.go b/engine/filesystem_manager.go deleted file mode 100644 index 294798b..0000000 --- a/engine/filesystem_manager.go +++ /dev/null @@ -1,318 +0,0 @@ -package engine - -import ( - "context" - "fmt" - "gosvc/logger" - "io" - "time" - - "robotfs/pkg/meta" - "robotfs/pkg/storage" - "robotfs/utils" -) - -type FileSystemManager struct { - root string - meta meta.MetaStore - store *storage.Storage - locker *utils.LockManager -} - -func NewFileSystemManager(meta meta.MetaStore, store *storage.Storage, root string) *FileSystemManager { - return &FileSystemManager{ - root: root, - meta: meta, - store: store, - locker: utils.NewLockManager(), - } -} - -func (f *FileSystemManager) BeginTransaction(ctx context.Context) (context.Context, error) { - return f.meta.BeginTransaction(ctx) -} - -func (f *FileSystemManager) CommitTransaction(ctx context.Context) error { - return f.meta.CommitTransaction(ctx) -} - -func (f *FileSystemManager) RollbackTransaction(ctx context.Context) error { - return f.meta.RollbackTransaction(ctx) -} - -var ( - Root = &utils.Entry{ - FullPath: utils.FullPath("/"), - IsDir: true, - CreateTime: time.Now().Unix(), - LastModificationTime: time.Now().Unix(), - } -) - -func (f *FileSystemManager) FindEntry(ctx context.Context, p utils.FullPath) (entry *utils.Entry, err error) { - if p == "/" { - return Root, nil - } - - entry, err = f.meta.FindEntry(ctx, p) - if err != nil { - return nil, err - } - - return -} - -func (f *FileSystemManager) MakeDirectory(ctx context.Context, path utils.FullPath) error { - locker := f.locker.AcquireLock(string(path)) - defer f.locker.ReleaseLock(locker) - - logger.Info("making directory => %s", path) - - if string(path) == "/" { - return nil - } - - if entry, err := f.FindEntry(ctx, path); err == nil && entry != nil { - return fmt.Errorf("directory %s already exists", path) - } - - parentDir, _ := path.DirAndName() - if parentDir != "/" { - if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { - return fmt.Errorf("parent directory %s does not exist", parentDir) - } - } - - if err := utils.Mkdir(f.root, string(path)); err != nil { - return err - } - - entry := utils.NewDirEntry(path) - return f.meta.InsertEntry(ctx, entry) -} - -func (f *FileSystemManager) ListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64) (entries []*utils.Entry, hasMore bool, lastFileName string, err error) { - locker := f.locker.AcquireRLock(string(p)) - defer f.locker.ReleaseLock(locker) - - logger.Info("listing directory %s => startFileName:%s inclusive:%t count:%d", p, startFileName, inclusive, limit) - - lastFileName, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool { - entries = append(entries, entry) - return true - }) - - hasMore = int64(len(entries)) >= limit - if hasMore { - entries = entries[:limit] - } - - return entries, hasMore, lastFileName, err -} - -func (f *FileSystemManager) StreamListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) { - - lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, eachEntryFunc) - return -} - -func (f *FileSystemManager) doListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) { - lastFileName, err = f.meta.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool { - select { - case <-ctx.Done(): - return false - default: - return eachEntryFunc(entry) - } - }) - return -} - -func (f *FileSystemManager) CreateFile(ctx context.Context, path utils.FullPath, reader io.Reader, contentType string, fileSize int64) error { - locker := f.locker.AcquireLock(string(path)) - defer f.locker.ReleaseLock(locker) - - logger.Info("uploading file => %s", path) - - if string(path) == "/" { - return fmt.Errorf("cannot create file %s", path) - } - - if entry, _ := f.FindEntry(ctx, path); entry != nil { - return fmt.Errorf("file %s already exists", path) - } - - parentDir, _ := path.DirAndName() - if parentDir != "/" { - if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { - return fmt.Errorf("parent directory %s does not exist", parentDir) - } - } - - output, err := f.store.UploadFile(path.ToS3Key(), reader, contentType) - if err != nil { - return fmt.Errorf("upload file failed: %v", err) - } - - if output.ETag == nil { - empty := "" - output.ETag = &empty - } - - if output.VersionID == nil { - empty := "" - output.VersionID = &empty - } - - entry := utils.NewFileEntry(path, path.ToS3Key(), uint64(fileSize), contentType, *output.ETag, *output.VersionID) - if err := f.meta.InsertEntry(ctx, entry); err != nil { - return fmt.Errorf("create file entry failed: %v", err) - } - - return nil -} - -func (f *FileSystemManager) DownloadFile(ctx context.Context, path utils.FullPath) (*utils.S3ReadSeeker, *utils.Entry, error) { - locker := f.locker.AcquireRLock(string(path)) - defer f.locker.ReleaseLock(locker) - - logger.Info("downloading file => %s", path) - - entry, err := f.FindEntry(ctx, path) - if err != nil { - return nil, nil, fmt.Errorf("find entry failed: %v", err) - } - - if entry.IsDir { - return nil, nil, fmt.Errorf("cannot download directory") - } - - downloader, err := f.store.DownloadFile(entry.S3Key) - if err != nil { - return nil, nil, fmt.Errorf("download file failed: %v", err) - } - - return downloader, entry, nil -} - -func (f *FileSystemManager) DeleteFile(ctx context.Context, path utils.FullPath, isDir bool) error { - locker := f.locker.AcquireLock(string(path)) - defer f.locker.ReleaseLock(locker) - - logger.Info("deleting file => %s", path) - - if string(path) == "/" { - return fmt.Errorf("cannot delete root") - } - - entry, err := f.FindEntry(ctx, path) - if err != nil { - return fmt.Errorf("find entry failed: %v", err) - } - - if !isDir { - if err := f.store.DeleteObject(entry.S3Key); err != nil { - return fmt.Errorf("delete file failed: %v", err) - } - } - - if err := f.meta.DeleteEntry(ctx, path); err != nil { - return fmt.Errorf("delete entry failed: %v", err) - } - - return nil -} - -func (f *FileSystemManager) RenameFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error { - locker := f.locker.AcquireOrderedLock(string(srcPath), string(dstPath)) - defer f.locker.ReleaseLocks(locker) - - logger.Info("moving file %s => %s", srcPath, dstPath) - - if string(srcPath) == "/" || string(dstPath) == "/" { - return fmt.Errorf("cannot rename root") - } - - srcEntry, err := f.FindEntry(ctx, srcPath) - if err != nil { - return fmt.Errorf("find src entry failed: %v", err) - } - - if dstEntry, _ := f.FindEntry(ctx, dstPath); dstEntry != nil { - return fmt.Errorf("dst entry %s already exists", dstPath) - } - - parentDir, _ := dstPath.DirAndName() - if parentDir != "/" { - if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { - return fmt.Errorf("parent directory %s does not exist", parentDir) - } - } - - if isDir { - return fmt.Errorf("rename not file") - } - - if err := utils.Move(f.root, string(srcPath), string(dstPath)); err != nil { - return err - } - - newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID) - if err := f.meta.UpdateEntry(ctx, newEntry); err != nil { - return fmt.Errorf("update entry failed: %v", err) - } - - if err := f.meta.DeleteEntry(ctx, srcPath); err != nil { - return fmt.Errorf("delete src entry failed: %v", err) - } - - return nil -} - -func (f *FileSystemManager) CopyFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error { - locker := f.locker.AcquireOrderedLock(string(srcPath), string(dstPath)) - defer f.locker.ReleaseLocks(locker) - - logger.Info("copying file %s => %s", srcPath, dstPath) - - if string(srcPath) == "/" || string(dstPath) == "/" { - return fmt.Errorf("cannot copy root") - } - - srcEntry, err := f.FindEntry(ctx, srcPath) - if err != nil { - return fmt.Errorf("find src entry failed: %v", err) - } - - if dstEntry, err := f.FindEntry(ctx, dstPath); dstEntry != nil && err == nil { - return fmt.Errorf("dst entry %s already exists", dstPath) - } - - parentDir, _ := dstPath.DirAndName() - if parentDir != "/" { - if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { - return fmt.Errorf("parent directory %s does not exist", parentDir) - } - } - - if isDir { - return fmt.Errorf("copy not file") - } - - if err := utils.Clone(f.root, string(srcPath), string(dstPath)); err != nil { - return err - } - - newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID) - if err := f.meta.InsertEntry(ctx, newEntry); err != nil { - return fmt.Errorf("insert new entry failed: %v", err) - } - - return nil -} - -func (f *FileSystemManager) Shutdown() { - if f.meta != nil { - f.meta.Shutdown() - } -} diff --git a/pkg/fs/filesystem.go b/pkg/fs/filesystem.go new file mode 100644 index 0000000..4b18b60 --- /dev/null +++ b/pkg/fs/filesystem.go @@ -0,0 +1,318 @@ +package fs + +import ( + "context" + "fmt" + "gosvc/logger" + "io" + "time" + + "robotfs/pkg/meta" + "robotfs/pkg/storage" + "robotfs/utils" +) + +type FileSystem struct { + root string + meta meta.MetaStore + store *storage.Storage + locker *utils.LockManager +} + +func NewFileSystem(meta meta.MetaStore, store *storage.Storage, root string) *FileSystem { + return &FileSystem{ + root: root, + meta: meta, + store: store, + locker: utils.NewLockManager(), + } +} + +func (fs *FileSystem) BeginTransaction(ctx context.Context) (context.Context, error) { + return fs.meta.BeginTransaction(ctx) +} + +func (fs *FileSystem) CommitTransaction(ctx context.Context) error { + return fs.meta.CommitTransaction(ctx) +} + +func (fs *FileSystem) RollbackTransaction(ctx context.Context) error { + return fs.meta.RollbackTransaction(ctx) +} + +var ( + Root = &utils.Entry{ + FullPath: utils.FullPath("/"), + IsDir: true, + CreateTime: time.Now().Unix(), + LastModificationTime: time.Now().Unix(), + } +) + +func (fs *FileSystem) FindEntry(ctx context.Context, p utils.FullPath) (entry *utils.Entry, err error) { + if p == "/" { + return Root, nil + } + + entry, err = fs.meta.FindEntry(ctx, p) + if err != nil { + return nil, err + } + + return +} + +func (fs *FileSystem) MakeDirectory(ctx context.Context, path utils.FullPath) error { + locker := fs.locker.AcquireLock(string(path)) + defer fs.locker.ReleaseLock(locker) + + logger.Info("making directory => %s", path) + + if string(path) == "/" { + return nil + } + + if entry, err := fs.FindEntry(ctx, path); err == nil && entry != nil { + return fmt.Errorf("directory %s already exists", path) + } + + parentDir, _ := path.DirAndName() + if parentDir != "/" { + if parentEntry, _ := fs.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { + return fmt.Errorf("parent directory %s does not exist", parentDir) + } + } + + if err := utils.Mkdir(fs.root, string(path)); err != nil { + return err + } + + entry := utils.NewDirEntry(path) + return fs.meta.InsertEntry(ctx, entry) +} + +func (fs *FileSystem) ListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64) (entries []*utils.Entry, hasMore bool, lastFileName string, err error) { + locker := fs.locker.AcquireRLock(string(p)) + defer fs.locker.ReleaseLock(locker) + + logger.Info("listing directory %s => startFileName:%s inclusive:%t count:%d", p, startFileName, inclusive, limit) + + lastFileName, err = fs.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool { + entries = append(entries, entry) + return true + }) + + hasMore = int64(len(entries)) >= limit + if hasMore { + entries = entries[:limit] + } + + return entries, hasMore, lastFileName, err +} + +func (fs *FileSystem) StreamListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) { + + lastFileName, err = fs.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, eachEntryFunc) + return +} + +func (fs *FileSystem) doListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) { + lastFileName, err = fs.meta.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool { + select { + case <-ctx.Done(): + return false + default: + return eachEntryFunc(entry) + } + }) + return +} + +func (fs *FileSystem) CreateFile(ctx context.Context, path utils.FullPath, reader io.Reader, contentType string, fileSize int64) error { + locker := fs.locker.AcquireLock(string(path)) + defer fs.locker.ReleaseLock(locker) + + logger.Info("uploading file => %s", path) + + if string(path) == "/" { + return fmt.Errorf("cannot create file %s", path) + } + + if entry, _ := fs.FindEntry(ctx, path); entry != nil { + return fmt.Errorf("file %s already exists", path) + } + + parentDir, _ := path.DirAndName() + if parentDir != "/" { + if parentEntry, _ := fs.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { + return fmt.Errorf("parent directory %s does not exist", parentDir) + } + } + + output, err := fs.store.UploadFile(path.ToS3Key(), reader, contentType) + if err != nil { + return fmt.Errorf("upload file failed: %v", err) + } + + if output.ETag == nil { + empty := "" + output.ETag = &empty + } + + if output.VersionID == nil { + empty := "" + output.VersionID = &empty + } + + entry := utils.NewFileEntry(path, path.ToS3Key(), uint64(fileSize), contentType, *output.ETag, *output.VersionID) + if err := fs.meta.InsertEntry(ctx, entry); err != nil { + return fmt.Errorf("create file entry failed: %v", err) + } + + return nil +} + +func (fs *FileSystem) DownloadFile(ctx context.Context, path utils.FullPath) (*utils.S3ReadSeeker, *utils.Entry, error) { + locker := fs.locker.AcquireRLock(string(path)) + defer fs.locker.ReleaseLock(locker) + + logger.Info("downloading file => %s", path) + + entry, err := fs.FindEntry(ctx, path) + if err != nil { + return nil, nil, fmt.Errorf("find entry failed: %v", err) + } + + if entry.IsDir { + return nil, nil, fmt.Errorf("cannot download directory") + } + + downloader, err := fs.store.DownloadFile(entry.S3Key) + if err != nil { + return nil, nil, fmt.Errorf("download file failed: %v", err) + } + + return downloader, entry, nil +} + +func (fs *FileSystem) DeleteFile(ctx context.Context, path utils.FullPath, isDir bool) error { + locker := fs.locker.AcquireLock(string(path)) + defer fs.locker.ReleaseLock(locker) + + logger.Info("deleting file => %s", path) + + if string(path) == "/" { + return fmt.Errorf("cannot delete root") + } + + entry, err := fs.FindEntry(ctx, path) + if err != nil { + return fmt.Errorf("find entry failed: %v", err) + } + + if !isDir { + if err := fs.store.DeleteObject(entry.S3Key); err != nil { + return fmt.Errorf("delete file failed: %v", err) + } + } + + if err := fs.meta.DeleteEntry(ctx, path); err != nil { + return fmt.Errorf("delete entry failed: %v", err) + } + + return nil +} + +func (fs *FileSystem) RenameFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error { + locker := fs.locker.AcquireOrderedLock(string(srcPath), string(dstPath)) + defer fs.locker.ReleaseLocks(locker) + + logger.Info("moving file %s => %s", srcPath, dstPath) + + if string(srcPath) == "/" || string(dstPath) == "/" { + return fmt.Errorf("cannot rename root") + } + + srcEntry, err := fs.FindEntry(ctx, srcPath) + if err != nil { + return fmt.Errorf("find src entry failed: %v", err) + } + + if dstEntry, _ := fs.FindEntry(ctx, dstPath); dstEntry != nil { + return fmt.Errorf("dst entry %s already exists", dstPath) + } + + parentDir, _ := dstPath.DirAndName() + if parentDir != "/" { + if parentEntry, _ := fs.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { + return fmt.Errorf("parent directory %s does not exist", parentDir) + } + } + + if isDir { + return fmt.Errorf("rename not file") + } + + if err := utils.Move(fs.root, string(srcPath), string(dstPath)); err != nil { + return err + } + + newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID) + if err := fs.meta.UpdateEntry(ctx, newEntry); err != nil { + return fmt.Errorf("update entry failed: %v", err) + } + + if err := fs.meta.DeleteEntry(ctx, srcPath); err != nil { + return fmt.Errorf("delete src entry failed: %v", err) + } + + return nil +} + +func (fs *FileSystem) CopyFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error { + locker := fs.locker.AcquireOrderedLock(string(srcPath), string(dstPath)) + defer fs.locker.ReleaseLocks(locker) + + logger.Info("copying file %s => %s", srcPath, dstPath) + + if string(srcPath) == "/" || string(dstPath) == "/" { + return fmt.Errorf("cannot copy root") + } + + srcEntry, err := fs.FindEntry(ctx, srcPath) + if err != nil { + return fmt.Errorf("find src entry failed: %v", err) + } + + if dstEntry, err := fs.FindEntry(ctx, dstPath); dstEntry != nil && err == nil { + return fmt.Errorf("dst entry %s already exists", dstPath) + } + + parentDir, _ := dstPath.DirAndName() + if parentDir != "/" { + if parentEntry, _ := fs.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { + return fmt.Errorf("parent directory %s does not exist", parentDir) + } + } + + if isDir { + return fmt.Errorf("copy not file") + } + + if err := utils.Clone(fs.root, string(srcPath), string(dstPath)); err != nil { + return err + } + + newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID) + if err := fs.meta.InsertEntry(ctx, newEntry); err != nil { + return fmt.Errorf("insert new entry failed: %v", err) + } + + return nil +} + +func (fs *FileSystem) Shutdown() { + if fs.meta != nil { + fs.meta.Shutdown() + } +} diff --git a/engine/filesystem_copy.go b/pkg/fs/filesystem_copy.go similarity index 60% rename from engine/filesystem_copy.go rename to pkg/fs/filesystem_copy.go index c85be4e..5815979 100644 --- a/engine/filesystem_copy.go +++ b/pkg/fs/filesystem_copy.go @@ -1,4 +1,4 @@ -package engine +package fs import ( "context" @@ -10,9 +10,9 @@ import ( "robotfs/utils" ) -func (f *FileSystemManager) CopyDirectory(ctx context.Context, oldPath, newPath utils.FullPath, isDir bool) error { - locker := f.locker.AcquireOrderedLock(string(oldPath), string(newPath)) - defer f.locker.ReleaseLocks(locker) +func (fs *FileSystem) CopyDirectory(ctx context.Context, oldPath, newPath utils.FullPath, isDir bool) error { + locker := fs.locker.AcquireOrderedLock(string(oldPath), string(newPath)) + defer fs.locker.ReleaseLocks(locker) if string(oldPath) == "/" { return fmt.Errorf("cannot copy root directory") @@ -22,37 +22,37 @@ func (f *FileSystemManager) CopyDirectory(ctx context.Context, oldPath, newPath return fmt.Errorf("cannot copy directory to a subdirectory of itself") } - oldEntry, err := f.FindEntry(ctx, oldPath) + oldEntry, err := fs.FindEntry(ctx, oldPath) if err != nil { return fmt.Errorf("%s not found: %v", oldPath, err) } - if newEntry, err := f.FindEntry(ctx, newPath); newEntry != nil && err == nil { + if newEntry, err := fs.FindEntry(ctx, newPath); newEntry != nil && err == nil { return fmt.Errorf("destination %s already exists", newPath) } parentDir, _ := newPath.DirAndName() if parentDir != "/" { - if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { + if parentEntry, _ := fs.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { return fmt.Errorf("parent directory %s does not exist", parentDir) } } - if err := f.copyEntry(ctx, oldPath, oldEntry, newPath); err != nil { + if err := fs.copyEntry(ctx, oldPath, oldEntry, newPath); err != nil { return fmt.Errorf("copy metadata failed: %v", err) } - if err := utils.Clone(f.root, string(oldPath), string(newPath)); err != nil { + if err := utils.Clone(fs.root, string(oldPath), string(newPath)); err != nil { return fmt.Errorf("copy file data failed: %v", err) } return nil } -func (f *FileSystemManager) copyEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath) error { - if err := f.copySelfEntry(ctx, oldPath, entry, newPath, func() error { +func (fs *FileSystem) copyEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath) error { + if err := fs.copySelfEntry(ctx, oldPath, entry, newPath, func() error { if entry.IsDir { - if err := f.copyFolderSubEntries(ctx, oldPath, newPath); err != nil { + if err := fs.copyFolderSubEntries(ctx, oldPath, newPath); err != nil { return err } } @@ -64,14 +64,14 @@ func (f *FileSystemManager) copyEntry(ctx context.Context, oldPath utils.FullPat return nil } -func (f *FileSystemManager) copyFolderSubEntries(ctx context.Context, oldPath utils.FullPath, newPath utils.FullPath) error { +func (fs *FileSystem) copyFolderSubEntries(ctx context.Context, oldPath utils.FullPath, newPath utils.FullPath) error { logger.Info("copying folder %s => %s", oldPath, newPath) lastFileName := "" includeLastFile := false for { entries := make([]*utils.Entry, 0, 1000) - lastFileName, err := f.doListDirectoryEntries(ctx, oldPath, lastFileName, includeLastFile, 1000, func(entry *utils.Entry) bool { + lastFileName, err := fs.doListDirectoryEntries(ctx, oldPath, lastFileName, includeLastFile, 1000, func(entry *utils.Entry) bool { entries = append(entries, entry) return true }) @@ -86,7 +86,7 @@ func (f *FileSystemManager) copyFolderSubEntries(ctx context.Context, oldPath ut for _, item := range entries { itemOldPath := oldPath.Child(item.FullPath.Name()) itemNewPath := newPath.Child(item.FullPath.Name()) - err := f.copyEntry(ctx, itemOldPath, item, itemNewPath) + err := fs.copyEntry(ctx, itemOldPath, item, itemNewPath) if err != nil { return err } @@ -100,7 +100,7 @@ func (f *FileSystemManager) copyFolderSubEntries(ctx context.Context, oldPath ut return nil } -func (f *FileSystemManager) copySelfEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath, copyFolderSubEntries func() error) error { +func (fs *FileSystem) copySelfEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath, copyFolderSubEntries func() error) error { logger.Info("copying entry %s => %s", oldPath, newPath) if oldPath == newPath { @@ -121,7 +121,7 @@ func (f *FileSystemManager) copySelfEntry(ctx context.Context, oldPath utils.Ful Extended: entry.Extended, } - if createErr := f.meta.InsertEntry(ctx, newEntry); createErr != nil { + if createErr := fs.meta.InsertEntry(ctx, newEntry); createErr != nil { return createErr } diff --git a/engine/filesystem_delete.go b/pkg/fs/filesystem_delete.go similarity index 51% rename from engine/filesystem_delete.go rename to pkg/fs/filesystem_delete.go index e95ab99..8237f2c 100644 --- a/engine/filesystem_delete.go +++ b/pkg/fs/filesystem_delete.go @@ -1,4 +1,4 @@ -package engine +package fs import ( "context" @@ -8,59 +8,59 @@ import ( "robotfs/utils" ) -func (f *FileSystemManager) DeleteDirectory(ctx context.Context, path utils.FullPath, isRecursive, ignoreRecursiveError bool) error { - locker := f.locker.AcquireLock(string(path)) - defer f.locker.ReleaseLock(locker) +func (fs *FileSystem) DeleteDirectory(ctx context.Context, path utils.FullPath, isRecursive, ignoreRecursiveError bool) error { + locker := fs.locker.AcquireLock(string(path)) + defer fs.locker.ReleaseLock(locker) if string(path) == "/" { return nil } - entry, err := f.FindEntry(ctx, path) + entry, err := fs.FindEntry(ctx, path) if err != nil { return err } if entry.IsDir { - err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError) + err = fs.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError) if err != nil { return err } } // Delete the directory entry itself - err = f.doDeleteEntryMetaAndData(ctx, entry) + err = fs.doDeleteEntryMetaAndData(ctx, entry) if err != nil { return err } - if err := utils.Remove(f.root, string(path), isRecursive); err != nil { + if err := utils.Remove(fs.root, string(path), isRecursive); err != nil { return err } return nil } -func (f *FileSystemManager) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *utils.Entry, isRecursive, ignoreRecursiveError bool) error { +func (fs *FileSystem) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *utils.Entry, isRecursive, ignoreRecursiveError bool) error { logger.Info("deleting folder => %s", entry.FullPath) lastFileName := "" includeLastFile := false for { - lastFileName, err := f.doListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, 1000, func(sub *utils.Entry) bool { + lastFileName, err := fs.doListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, 1000, func(sub *utils.Entry) bool { if sub.IsDir { // Recursively delete subdirectory - err := f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError) + err := fs.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError) if err == nil { - err = f.doDeleteEntryMetaAndData(ctx, sub) + err = fs.doDeleteEntryMetaAndData(ctx, sub) } if err != nil && !ignoreRecursiveError { return false } } else { // Delete file entry - err := f.doDeleteEntryMetaAndData(ctx, sub) + err := fs.doDeleteEntryMetaAndData(ctx, sub) if err != nil && !ignoreRecursiveError { return false } @@ -74,7 +74,7 @@ func (f *FileSystemManager) doBatchDeleteFolderMetaAndData(ctx context.Context, if lastFileName == "" && !isRecursive { // Check if there are any entries hasEntries := false - _, err := f.doListDirectoryEntries(ctx, entry.FullPath, "", false, 1, func(_ *utils.Entry) bool { + _, err := fs.doListDirectoryEntries(ctx, entry.FullPath, "", false, 1, func(_ *utils.Entry) bool { hasEntries = true return false }) @@ -91,16 +91,16 @@ func (f *FileSystemManager) doBatchDeleteFolderMetaAndData(ctx context.Context, } } - if storeDeletionErr := f.meta.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil { + if storeDeletionErr := fs.meta.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil { return fmt.Errorf("delete folder children metadata: %v", storeDeletionErr) } return nil } -func (f *FileSystemManager) doDeleteEntryMetaAndData(ctx context.Context, entry *utils.Entry) error { +func (fs *FileSystem) doDeleteEntryMetaAndData(ctx context.Context, entry *utils.Entry) error { logger.Info("deleting entry => %s", entry.FullPath) - if storeDeletionErr := f.meta.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil { + if storeDeletionErr := fs.meta.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil { return fmt.Errorf("delete entry metadata: %v", storeDeletionErr) } diff --git a/engine/filesystem_rename.go b/pkg/fs/filesystem_rename.go similarity index 59% rename from engine/filesystem_rename.go rename to pkg/fs/filesystem_rename.go index aefdda3..44a3ddf 100644 --- a/engine/filesystem_rename.go +++ b/pkg/fs/filesystem_rename.go @@ -1,4 +1,4 @@ -package engine +package fs import ( "context" @@ -10,32 +10,32 @@ import ( "robotfs/utils" ) -func (f *FileSystemManager) RenameDirectory(ctx context.Context, oldPath, newPath utils.FullPath, isDir bool) error { - locker := f.locker.AcquireOrderedLock(string(oldPath), string(newPath)) - defer f.locker.ReleaseLocks(locker) +func (fs *FileSystem) RenameDirectory(ctx context.Context, oldPath, newPath utils.FullPath, isDir bool) error { + locker := fs.locker.AcquireOrderedLock(string(oldPath), string(newPath)) + defer fs.locker.ReleaseLocks(locker) - if err := f.canRename(oldPath, newPath, isDir); err != nil { + if err := fs.canRename(oldPath, newPath, isDir); err != nil { return err } - oldEntry, err := f.FindEntry(ctx, oldPath) + oldEntry, err := fs.FindEntry(ctx, oldPath) if err != nil { return fmt.Errorf("%s not found: %v", oldPath, err) } - moveErr := f.moveEntry(ctx, oldPath, oldEntry, newPath) + moveErr := fs.moveEntry(ctx, oldPath, oldEntry, newPath) if moveErr != nil { return fmt.Errorf("%s move error: %v", oldPath, moveErr) } - if err := utils.Move(f.root, string(oldPath), string(newPath)); err != nil { + if err := utils.Move(fs.root, string(oldPath), string(newPath)); err != nil { return fmt.Errorf("move file data failed: %v", err) } return nil } -func (f *FileSystemManager) canRename(source, target utils.FullPath, isDir bool) error { +func (fs *FileSystem) canRename(source, target utils.FullPath, isDir bool) error { if string(source) == "/" { return fmt.Errorf("mv: cannot move root directory") } @@ -47,10 +47,10 @@ func (f *FileSystemManager) canRename(source, target utils.FullPath, isDir bool) return nil } -func (f *FileSystemManager) moveEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath) error { - if err := f.moveSelfEntry(ctx, oldPath, entry, newPath, func() error { +func (fs *FileSystem) moveEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath) error { + if err := fs.moveSelfEntry(ctx, oldPath, entry, newPath, func() error { if entry.IsDir { - if err := f.moveFolderSubEntries(ctx, oldPath, newPath); err != nil { + if err := fs.moveFolderSubEntries(ctx, oldPath, newPath); err != nil { return err } } @@ -62,14 +62,14 @@ func (f *FileSystemManager) moveEntry(ctx context.Context, oldPath utils.FullPat return nil } -func (f *FileSystemManager) moveFolderSubEntries(ctx context.Context, oldPath utils.FullPath, newPath utils.FullPath) error { +func (fs *FileSystem) moveFolderSubEntries(ctx context.Context, oldPath utils.FullPath, newPath utils.FullPath) error { logger.Info("moving folder %s => %s", oldPath, newPath) lastFileName := "" includeLastFile := false for { entries := make([]*utils.Entry, 0, 1000) - lastFileName, err := f.doListDirectoryEntries(ctx, oldPath, lastFileName, includeLastFile, 1000, func(entry *utils.Entry) bool { + lastFileName, err := fs.doListDirectoryEntries(ctx, oldPath, lastFileName, includeLastFile, 1000, func(entry *utils.Entry) bool { entries = append(entries, entry) return true }) @@ -84,7 +84,7 @@ func (f *FileSystemManager) moveFolderSubEntries(ctx context.Context, oldPath ut for _, item := range entries { itemOldPath := oldPath.Child(item.FullPath.Name()) itemNewPath := newPath.Child(item.FullPath.Name()) - err := f.moveEntry(ctx, itemOldPath, item, itemNewPath) + err := fs.moveEntry(ctx, itemOldPath, item, itemNewPath) if err != nil { return err } @@ -98,7 +98,7 @@ func (f *FileSystemManager) moveFolderSubEntries(ctx context.Context, oldPath ut return nil } -func (f *FileSystemManager) moveSelfEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath, moveFolderSubEntries func() error) error { +func (fs *FileSystem) moveSelfEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath, moveFolderSubEntries func() error) error { logger.Info("moving entry %s => %s", oldPath, newPath) if oldPath == newPath { @@ -119,7 +119,7 @@ func (f *FileSystemManager) moveSelfEntry(ctx context.Context, oldPath utils.Ful Extended: entry.Extended, } - if updateErr := f.meta.UpdateEntry(ctx, newEntry); updateErr != nil { + if updateErr := fs.meta.UpdateEntry(ctx, newEntry); updateErr != nil { return updateErr } @@ -129,7 +129,7 @@ func (f *FileSystemManager) moveSelfEntry(ctx context.Context, oldPath utils.Ful } } - deleteErr := f.meta.DeleteEntry(ctx, oldPath) + deleteErr := fs.meta.DeleteEntry(ctx, oldPath) if deleteErr != nil { return deleteErr } diff --git a/pkg/meta/redis_lua/redis_store.go b/pkg/meta/redis_lua/redis_store.go index 39b9670..3b58e45 100644 --- a/pkg/meta/redis_lua/redis_store.go +++ b/pkg/meta/redis_lua/redis_store.go @@ -30,10 +30,12 @@ func (store *RedisLuaStore) Initialize(configuration config.Configuration, prefi func (store *RedisLuaStore) initialize(hostPort string, username, password string, database int, superLargeDirectories []string) (err error) { store.Client = redis.NewClient(&redis.Options{ - Addr: hostPort, - Username: username, - Password: password, - DB: database, + Addr: hostPort, + Username: username, + Password: password, + DB: database, + PoolSize: 100, + MinIdleConns: 20, }) store.loadSuperLargeDirectories(superLargeDirectories) return