Update: update redis configuration
This commit is contained in:
parent
3455429b38
commit
2fd5a0e4b8
@ -44,7 +44,7 @@ func (s *Service) HandleMkdir(
|
|||||||
path := params.Path
|
path := params.Path
|
||||||
newPath := utils.NormalizePath(path)
|
newPath := utils.NormalizePath(path)
|
||||||
|
|
||||||
err := s.FileSystemManager.MakeDirectory(req.Context(), utils.FullPath(newPath))
|
err := s.FileSystem.MakeDirectory(req.Context(), utils.FullPath(newPath))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("mkdir %s: %v", path, err)
|
logger.Error("mkdir %s: %v", path, err)
|
||||||
out := entity.NewError[any](
|
out := entity.NewError[any](
|
||||||
@ -70,7 +70,7 @@ func (s *Service) HandleListDirectory(
|
|||||||
limit := req.QueryInt("limit")
|
limit := req.QueryInt("limit")
|
||||||
newPath := utils.NormalizePath(path)
|
newPath := utils.NormalizePath(path)
|
||||||
|
|
||||||
rawEntries, moreAvailable, lastFileName, err := s.FileSystemManager.ListDirectoryEntries(
|
rawEntries, moreAvailable, lastFileName, err := s.FileSystem.ListDirectoryEntries(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
utils.FullPath(newPath),
|
utils.FullPath(newPath),
|
||||||
startFileName,
|
startFileName,
|
||||||
@ -134,7 +134,7 @@ func (s *Service) HandleDeleteDirectory(
|
|||||||
return resp.OK(out).JSON()
|
return resp.OK(out).JSON()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.FileSystemManager.DeleteDirectory(req.Context(), utils.FullPath(newPath), isDir, false); err != nil {
|
if err := s.FileSystem.DeleteDirectory(req.Context(), utils.FullPath(newPath), isDir, false); err != nil {
|
||||||
logger.Error("del dir %s: %v", path, err)
|
logger.Error("del dir %s: %v", path, err)
|
||||||
out := entity.NewError[any](
|
out := entity.NewError[any](
|
||||||
entity.CodeInternalServer,
|
entity.CodeInternalServer,
|
||||||
@ -160,7 +160,7 @@ func (s *Service) HandleRenameDirectory(
|
|||||||
|
|
||||||
oldPath := utils.FullPath(srcPath)
|
oldPath := utils.FullPath(srcPath)
|
||||||
newPath := utils.FullPath(dstPath)
|
newPath := utils.FullPath(dstPath)
|
||||||
if err := s.FileSystemManager.RenameDirectory(req.Context(), oldPath, newPath, isDir); err != nil {
|
if err := s.FileSystem.RenameDirectory(req.Context(), oldPath, newPath, isDir); err != nil {
|
||||||
logger.Error("move dir %s => %s: %v", params.SrcPath, params.DstPath, err)
|
logger.Error("move dir %s => %s: %v", params.SrcPath, params.DstPath, err)
|
||||||
out := entity.NewError[any](
|
out := entity.NewError[any](
|
||||||
entity.CodeInternalServer,
|
entity.CodeInternalServer,
|
||||||
@ -186,7 +186,7 @@ func (s *Service) HandleCopyDirectory(
|
|||||||
|
|
||||||
oldPath := utils.FullPath(srcPath)
|
oldPath := utils.FullPath(srcPath)
|
||||||
newPath := utils.FullPath(dstPath)
|
newPath := utils.FullPath(dstPath)
|
||||||
if err := s.FileSystemManager.CopyDirectory(req.Context(), oldPath, newPath, isDir); err != nil {
|
if err := s.FileSystem.CopyDirectory(req.Context(), oldPath, newPath, isDir); err != nil {
|
||||||
logger.Error("copy dir %s => %s: %v", params.SrcPath, params.DstPath, err)
|
logger.Error("copy dir %s => %s: %v", params.SrcPath, params.DstPath, err)
|
||||||
out := entity.NewError[any](
|
out := entity.NewError[any](
|
||||||
entity.CodeInternalServer,
|
entity.CodeInternalServer,
|
||||||
|
|||||||
12
api_file.go
12
api_file.go
@ -63,7 +63,7 @@ func (s *Service) HandleUploadFile(
|
|||||||
contentType := fileHeader.Header.Get("Content-Type")
|
contentType := fileHeader.Header.Get("Content-Type")
|
||||||
fileSize := fileHeader.Size
|
fileSize := fileHeader.Size
|
||||||
|
|
||||||
if err := s.Engine.FileSystemManager.CreateFile(req.Context(), utils.FullPath(newPath), file, contentType, fileSize); err != nil {
|
if err := s.FileSystem.CreateFile(req.Context(), utils.FullPath(newPath), file, contentType, fileSize); err != nil {
|
||||||
logger.Error("create %s: %v", path, err)
|
logger.Error("create %s: %v", path, err)
|
||||||
out := entity.NewError[any](
|
out := entity.NewError[any](
|
||||||
entity.CodeInternalServer,
|
entity.CodeInternalServer,
|
||||||
@ -85,7 +85,7 @@ func (s *Service) HandleInfoFile(
|
|||||||
path := req.QueryString("path")
|
path := req.QueryString("path")
|
||||||
newPath := utils.NormalizePath(path)
|
newPath := utils.NormalizePath(path)
|
||||||
|
|
||||||
entry, err := s.FileSystemManager.FindEntry(req.Context(), utils.FullPath(newPath))
|
entry, err := s.FileSystem.FindEntry(req.Context(), utils.FullPath(newPath))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("info %s: %v", path, err)
|
logger.Error("info %s: %v", path, err)
|
||||||
out := entity.NewError[any](
|
out := entity.NewError[any](
|
||||||
@ -109,7 +109,7 @@ func (s *Service) HandleDownloadFile(
|
|||||||
preview := req.QueryBool("preview")
|
preview := req.QueryBool("preview")
|
||||||
newPath := utils.NormalizePath(path)
|
newPath := utils.NormalizePath(path)
|
||||||
|
|
||||||
downloader, entry, err := s.FileSystemManager.DownloadFile(req.Context(), utils.FullPath(newPath))
|
downloader, entry, err := s.FileSystem.DownloadFile(req.Context(), utils.FullPath(newPath))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("download %s: %v", path, err)
|
logger.Error("download %s: %v", path, err)
|
||||||
out := entity.NewError[any](
|
out := entity.NewError[any](
|
||||||
@ -139,7 +139,7 @@ func (s *Service) HandleDeleteFile(
|
|||||||
isDir := params.IsDir
|
isDir := params.IsDir
|
||||||
newPath := utils.NormalizePath(path)
|
newPath := utils.NormalizePath(path)
|
||||||
|
|
||||||
err := s.FileSystemManager.DeleteFile(req.Context(), utils.FullPath(newPath), isDir)
|
err := s.FileSystem.DeleteFile(req.Context(), utils.FullPath(newPath), isDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("delete file %s: %v", path, err)
|
logger.Error("delete file %s: %v", path, err)
|
||||||
out := entity.NewError[any](
|
out := entity.NewError[any](
|
||||||
@ -164,7 +164,7 @@ func (s *Service) HandleRenameFile(
|
|||||||
srcPath := utils.FullPath(utils.NormalizePath(params.SrcPath))
|
srcPath := utils.FullPath(utils.NormalizePath(params.SrcPath))
|
||||||
dstPath := utils.FullPath(utils.NormalizePath(params.DstPath))
|
dstPath := utils.FullPath(utils.NormalizePath(params.DstPath))
|
||||||
|
|
||||||
err := s.FileSystemManager.RenameFile(req.Context(), srcPath, dstPath, isDir)
|
err := s.FileSystem.RenameFile(req.Context(), srcPath, dstPath, isDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("move file %s => %s: %v", srcPath, dstPath, err)
|
logger.Error("move file %s => %s: %v", srcPath, dstPath, err)
|
||||||
out := entity.NewError[any](
|
out := entity.NewError[any](
|
||||||
@ -189,7 +189,7 @@ func (s *Service) HandleCopyFile(
|
|||||||
srcPath := utils.FullPath(utils.NormalizePath(params.SrcPath))
|
srcPath := utils.FullPath(utils.NormalizePath(params.SrcPath))
|
||||||
dstPath := utils.FullPath(utils.NormalizePath(params.DstPath))
|
dstPath := utils.FullPath(utils.NormalizePath(params.DstPath))
|
||||||
|
|
||||||
err := s.FileSystemManager.CopyFile(req.Context(), srcPath, dstPath, isDir)
|
err := s.FileSystem.CopyFile(req.Context(), srcPath, dstPath, isDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("copy file %s => %s: %v", srcPath, dstPath, err)
|
logger.Error("copy file %s => %s: %v", srcPath, dstPath, err)
|
||||||
out := entity.NewError[any](
|
out := entity.NewError[any](
|
||||||
|
|||||||
@ -51,7 +51,7 @@ func SetupServiceConfig() {
|
|||||||
MaxConcurrentStreams: 10000,
|
MaxConcurrentStreams: 10000,
|
||||||
MaxTraces: 30 * 10000,
|
MaxTraces: 30 * 10000,
|
||||||
LogSlowRequest: true,
|
LogSlowRequest: true,
|
||||||
SlowRequestThreshold: 1000 * time.Millisecond,
|
SlowRequestThreshold: 5000 * time.Millisecond,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,12 +4,13 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"robotfs/config"
|
"robotfs/config"
|
||||||
|
"robotfs/pkg/fs"
|
||||||
"robotfs/pkg/meta/redis_lua"
|
"robotfs/pkg/meta/redis_lua"
|
||||||
"robotfs/pkg/storage"
|
"robotfs/pkg/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Engine struct {
|
type Engine struct {
|
||||||
FileSystemManager *FileSystemManager
|
FileSystem *fs.FileSystem
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEngine() *Engine {
|
func NewEngine() *Engine {
|
||||||
@ -29,7 +30,7 @@ func (e *Engine) Start() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
e.FileSystemManager = NewFileSystemManager(
|
e.FileSystem = fs.NewFileSystem(
|
||||||
meta,
|
meta,
|
||||||
store,
|
store,
|
||||||
cfg.GetString("robotfs.data_path"),
|
cfg.GetString("robotfs.data_path"),
|
||||||
@ -58,5 +59,5 @@ func (e *Engine) initStorage(cfg config.Configuration) (*storage.Storage, error)
|
|||||||
|
|
||||||
func (e *Engine) Stop() {
|
func (e *Engine) Stop() {
|
||||||
// Stop filesystem manager.
|
// Stop filesystem manager.
|
||||||
e.FileSystemManager.Shutdown()
|
e.FileSystem.Shutdown()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,318 +0,0 @@
|
|||||||
package engine
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"gosvc/logger"
|
|
||||||
"io"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"robotfs/pkg/meta"
|
|
||||||
"robotfs/pkg/storage"
|
|
||||||
"robotfs/utils"
|
|
||||||
)
|
|
||||||
|
|
||||||
type FileSystemManager struct {
|
|
||||||
root string
|
|
||||||
meta meta.MetaStore
|
|
||||||
store *storage.Storage
|
|
||||||
locker *utils.LockManager
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewFileSystemManager(meta meta.MetaStore, store *storage.Storage, root string) *FileSystemManager {
|
|
||||||
return &FileSystemManager{
|
|
||||||
root: root,
|
|
||||||
meta: meta,
|
|
||||||
store: store,
|
|
||||||
locker: utils.NewLockManager(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileSystemManager) BeginTransaction(ctx context.Context) (context.Context, error) {
|
|
||||||
return f.meta.BeginTransaction(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileSystemManager) CommitTransaction(ctx context.Context) error {
|
|
||||||
return f.meta.CommitTransaction(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileSystemManager) RollbackTransaction(ctx context.Context) error {
|
|
||||||
return f.meta.RollbackTransaction(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
Root = &utils.Entry{
|
|
||||||
FullPath: utils.FullPath("/"),
|
|
||||||
IsDir: true,
|
|
||||||
CreateTime: time.Now().Unix(),
|
|
||||||
LastModificationTime: time.Now().Unix(),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
func (f *FileSystemManager) FindEntry(ctx context.Context, p utils.FullPath) (entry *utils.Entry, err error) {
|
|
||||||
if p == "/" {
|
|
||||||
return Root, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
entry, err = f.meta.FindEntry(ctx, p)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileSystemManager) MakeDirectory(ctx context.Context, path utils.FullPath) error {
|
|
||||||
locker := f.locker.AcquireLock(string(path))
|
|
||||||
defer f.locker.ReleaseLock(locker)
|
|
||||||
|
|
||||||
logger.Info("making directory => %s", path)
|
|
||||||
|
|
||||||
if string(path) == "/" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if entry, err := f.FindEntry(ctx, path); err == nil && entry != nil {
|
|
||||||
return fmt.Errorf("directory %s already exists", path)
|
|
||||||
}
|
|
||||||
|
|
||||||
parentDir, _ := path.DirAndName()
|
|
||||||
if parentDir != "/" {
|
|
||||||
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
|
|
||||||
return fmt.Errorf("parent directory %s does not exist", parentDir)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := utils.Mkdir(f.root, string(path)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
entry := utils.NewDirEntry(path)
|
|
||||||
return f.meta.InsertEntry(ctx, entry)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileSystemManager) ListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64) (entries []*utils.Entry, hasMore bool, lastFileName string, err error) {
|
|
||||||
locker := f.locker.AcquireRLock(string(p))
|
|
||||||
defer f.locker.ReleaseLock(locker)
|
|
||||||
|
|
||||||
logger.Info("listing directory %s => startFileName:%s inclusive:%t count:%d", p, startFileName, inclusive, limit)
|
|
||||||
|
|
||||||
lastFileName, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool {
|
|
||||||
entries = append(entries, entry)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
hasMore = int64(len(entries)) >= limit
|
|
||||||
if hasMore {
|
|
||||||
entries = entries[:limit]
|
|
||||||
}
|
|
||||||
|
|
||||||
return entries, hasMore, lastFileName, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileSystemManager) StreamListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) {
|
|
||||||
|
|
||||||
lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, eachEntryFunc)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileSystemManager) doListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) {
|
|
||||||
lastFileName, err = f.meta.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return false
|
|
||||||
default:
|
|
||||||
return eachEntryFunc(entry)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileSystemManager) CreateFile(ctx context.Context, path utils.FullPath, reader io.Reader, contentType string, fileSize int64) error {
|
|
||||||
locker := f.locker.AcquireLock(string(path))
|
|
||||||
defer f.locker.ReleaseLock(locker)
|
|
||||||
|
|
||||||
logger.Info("uploading file => %s", path)
|
|
||||||
|
|
||||||
if string(path) == "/" {
|
|
||||||
return fmt.Errorf("cannot create file %s", path)
|
|
||||||
}
|
|
||||||
|
|
||||||
if entry, _ := f.FindEntry(ctx, path); entry != nil {
|
|
||||||
return fmt.Errorf("file %s already exists", path)
|
|
||||||
}
|
|
||||||
|
|
||||||
parentDir, _ := path.DirAndName()
|
|
||||||
if parentDir != "/" {
|
|
||||||
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
|
|
||||||
return fmt.Errorf("parent directory %s does not exist", parentDir)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
output, err := f.store.UploadFile(path.ToS3Key(), reader, contentType)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("upload file failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if output.ETag == nil {
|
|
||||||
empty := ""
|
|
||||||
output.ETag = &empty
|
|
||||||
}
|
|
||||||
|
|
||||||
if output.VersionID == nil {
|
|
||||||
empty := ""
|
|
||||||
output.VersionID = &empty
|
|
||||||
}
|
|
||||||
|
|
||||||
entry := utils.NewFileEntry(path, path.ToS3Key(), uint64(fileSize), contentType, *output.ETag, *output.VersionID)
|
|
||||||
if err := f.meta.InsertEntry(ctx, entry); err != nil {
|
|
||||||
return fmt.Errorf("create file entry failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileSystemManager) DownloadFile(ctx context.Context, path utils.FullPath) (*utils.S3ReadSeeker, *utils.Entry, error) {
|
|
||||||
locker := f.locker.AcquireRLock(string(path))
|
|
||||||
defer f.locker.ReleaseLock(locker)
|
|
||||||
|
|
||||||
logger.Info("downloading file => %s", path)
|
|
||||||
|
|
||||||
entry, err := f.FindEntry(ctx, path)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("find entry failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if entry.IsDir {
|
|
||||||
return nil, nil, fmt.Errorf("cannot download directory")
|
|
||||||
}
|
|
||||||
|
|
||||||
downloader, err := f.store.DownloadFile(entry.S3Key)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("download file failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return downloader, entry, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileSystemManager) DeleteFile(ctx context.Context, path utils.FullPath, isDir bool) error {
|
|
||||||
locker := f.locker.AcquireLock(string(path))
|
|
||||||
defer f.locker.ReleaseLock(locker)
|
|
||||||
|
|
||||||
logger.Info("deleting file => %s", path)
|
|
||||||
|
|
||||||
if string(path) == "/" {
|
|
||||||
return fmt.Errorf("cannot delete root")
|
|
||||||
}
|
|
||||||
|
|
||||||
entry, err := f.FindEntry(ctx, path)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("find entry failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !isDir {
|
|
||||||
if err := f.store.DeleteObject(entry.S3Key); err != nil {
|
|
||||||
return fmt.Errorf("delete file failed: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := f.meta.DeleteEntry(ctx, path); err != nil {
|
|
||||||
return fmt.Errorf("delete entry failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileSystemManager) RenameFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error {
|
|
||||||
locker := f.locker.AcquireOrderedLock(string(srcPath), string(dstPath))
|
|
||||||
defer f.locker.ReleaseLocks(locker)
|
|
||||||
|
|
||||||
logger.Info("moving file %s => %s", srcPath, dstPath)
|
|
||||||
|
|
||||||
if string(srcPath) == "/" || string(dstPath) == "/" {
|
|
||||||
return fmt.Errorf("cannot rename root")
|
|
||||||
}
|
|
||||||
|
|
||||||
srcEntry, err := f.FindEntry(ctx, srcPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("find src entry failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if dstEntry, _ := f.FindEntry(ctx, dstPath); dstEntry != nil {
|
|
||||||
return fmt.Errorf("dst entry %s already exists", dstPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
parentDir, _ := dstPath.DirAndName()
|
|
||||||
if parentDir != "/" {
|
|
||||||
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
|
|
||||||
return fmt.Errorf("parent directory %s does not exist", parentDir)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if isDir {
|
|
||||||
return fmt.Errorf("rename not file")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := utils.Move(f.root, string(srcPath), string(dstPath)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID)
|
|
||||||
if err := f.meta.UpdateEntry(ctx, newEntry); err != nil {
|
|
||||||
return fmt.Errorf("update entry failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := f.meta.DeleteEntry(ctx, srcPath); err != nil {
|
|
||||||
return fmt.Errorf("delete src entry failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileSystemManager) CopyFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error {
|
|
||||||
locker := f.locker.AcquireOrderedLock(string(srcPath), string(dstPath))
|
|
||||||
defer f.locker.ReleaseLocks(locker)
|
|
||||||
|
|
||||||
logger.Info("copying file %s => %s", srcPath, dstPath)
|
|
||||||
|
|
||||||
if string(srcPath) == "/" || string(dstPath) == "/" {
|
|
||||||
return fmt.Errorf("cannot copy root")
|
|
||||||
}
|
|
||||||
|
|
||||||
srcEntry, err := f.FindEntry(ctx, srcPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("find src entry failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if dstEntry, err := f.FindEntry(ctx, dstPath); dstEntry != nil && err == nil {
|
|
||||||
return fmt.Errorf("dst entry %s already exists", dstPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
parentDir, _ := dstPath.DirAndName()
|
|
||||||
if parentDir != "/" {
|
|
||||||
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
|
|
||||||
return fmt.Errorf("parent directory %s does not exist", parentDir)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if isDir {
|
|
||||||
return fmt.Errorf("copy not file")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := utils.Clone(f.root, string(srcPath), string(dstPath)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID)
|
|
||||||
if err := f.meta.InsertEntry(ctx, newEntry); err != nil {
|
|
||||||
return fmt.Errorf("insert new entry failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileSystemManager) Shutdown() {
|
|
||||||
if f.meta != nil {
|
|
||||||
f.meta.Shutdown()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
318
pkg/fs/filesystem.go
Normal file
318
pkg/fs/filesystem.go
Normal file
@ -0,0 +1,318 @@
|
|||||||
|
package fs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"gosvc/logger"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"robotfs/pkg/meta"
|
||||||
|
"robotfs/pkg/storage"
|
||||||
|
"robotfs/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FileSystem struct {
|
||||||
|
root string
|
||||||
|
meta meta.MetaStore
|
||||||
|
store *storage.Storage
|
||||||
|
locker *utils.LockManager
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFileSystem(meta meta.MetaStore, store *storage.Storage, root string) *FileSystem {
|
||||||
|
return &FileSystem{
|
||||||
|
root: root,
|
||||||
|
meta: meta,
|
||||||
|
store: store,
|
||||||
|
locker: utils.NewLockManager(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FileSystem) BeginTransaction(ctx context.Context) (context.Context, error) {
|
||||||
|
return fs.meta.BeginTransaction(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FileSystem) CommitTransaction(ctx context.Context) error {
|
||||||
|
return fs.meta.CommitTransaction(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FileSystem) RollbackTransaction(ctx context.Context) error {
|
||||||
|
return fs.meta.RollbackTransaction(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
Root = &utils.Entry{
|
||||||
|
FullPath: utils.FullPath("/"),
|
||||||
|
IsDir: true,
|
||||||
|
CreateTime: time.Now().Unix(),
|
||||||
|
LastModificationTime: time.Now().Unix(),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func (fs *FileSystem) FindEntry(ctx context.Context, p utils.FullPath) (entry *utils.Entry, err error) {
|
||||||
|
if p == "/" {
|
||||||
|
return Root, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
entry, err = fs.meta.FindEntry(ctx, p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FileSystem) MakeDirectory(ctx context.Context, path utils.FullPath) error {
|
||||||
|
locker := fs.locker.AcquireLock(string(path))
|
||||||
|
defer fs.locker.ReleaseLock(locker)
|
||||||
|
|
||||||
|
logger.Info("making directory => %s", path)
|
||||||
|
|
||||||
|
if string(path) == "/" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if entry, err := fs.FindEntry(ctx, path); err == nil && entry != nil {
|
||||||
|
return fmt.Errorf("directory %s already exists", path)
|
||||||
|
}
|
||||||
|
|
||||||
|
parentDir, _ := path.DirAndName()
|
||||||
|
if parentDir != "/" {
|
||||||
|
if parentEntry, _ := fs.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
|
||||||
|
return fmt.Errorf("parent directory %s does not exist", parentDir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := utils.Mkdir(fs.root, string(path)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
entry := utils.NewDirEntry(path)
|
||||||
|
return fs.meta.InsertEntry(ctx, entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FileSystem) ListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64) (entries []*utils.Entry, hasMore bool, lastFileName string, err error) {
|
||||||
|
locker := fs.locker.AcquireRLock(string(p))
|
||||||
|
defer fs.locker.ReleaseLock(locker)
|
||||||
|
|
||||||
|
logger.Info("listing directory %s => startFileName:%s inclusive:%t count:%d", p, startFileName, inclusive, limit)
|
||||||
|
|
||||||
|
lastFileName, err = fs.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool {
|
||||||
|
entries = append(entries, entry)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
hasMore = int64(len(entries)) >= limit
|
||||||
|
if hasMore {
|
||||||
|
entries = entries[:limit]
|
||||||
|
}
|
||||||
|
|
||||||
|
return entries, hasMore, lastFileName, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FileSystem) StreamListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) {
|
||||||
|
|
||||||
|
lastFileName, err = fs.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, eachEntryFunc)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FileSystem) doListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) {
|
||||||
|
lastFileName, err = fs.meta.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false
|
||||||
|
default:
|
||||||
|
return eachEntryFunc(entry)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FileSystem) CreateFile(ctx context.Context, path utils.FullPath, reader io.Reader, contentType string, fileSize int64) error {
|
||||||
|
locker := fs.locker.AcquireLock(string(path))
|
||||||
|
defer fs.locker.ReleaseLock(locker)
|
||||||
|
|
||||||
|
logger.Info("uploading file => %s", path)
|
||||||
|
|
||||||
|
if string(path) == "/" {
|
||||||
|
return fmt.Errorf("cannot create file %s", path)
|
||||||
|
}
|
||||||
|
|
||||||
|
if entry, _ := fs.FindEntry(ctx, path); entry != nil {
|
||||||
|
return fmt.Errorf("file %s already exists", path)
|
||||||
|
}
|
||||||
|
|
||||||
|
parentDir, _ := path.DirAndName()
|
||||||
|
if parentDir != "/" {
|
||||||
|
if parentEntry, _ := fs.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
|
||||||
|
return fmt.Errorf("parent directory %s does not exist", parentDir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
output, err := fs.store.UploadFile(path.ToS3Key(), reader, contentType)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("upload file failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if output.ETag == nil {
|
||||||
|
empty := ""
|
||||||
|
output.ETag = &empty
|
||||||
|
}
|
||||||
|
|
||||||
|
if output.VersionID == nil {
|
||||||
|
empty := ""
|
||||||
|
output.VersionID = &empty
|
||||||
|
}
|
||||||
|
|
||||||
|
entry := utils.NewFileEntry(path, path.ToS3Key(), uint64(fileSize), contentType, *output.ETag, *output.VersionID)
|
||||||
|
if err := fs.meta.InsertEntry(ctx, entry); err != nil {
|
||||||
|
return fmt.Errorf("create file entry failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FileSystem) DownloadFile(ctx context.Context, path utils.FullPath) (*utils.S3ReadSeeker, *utils.Entry, error) {
|
||||||
|
locker := fs.locker.AcquireRLock(string(path))
|
||||||
|
defer fs.locker.ReleaseLock(locker)
|
||||||
|
|
||||||
|
logger.Info("downloading file => %s", path)
|
||||||
|
|
||||||
|
entry, err := fs.FindEntry(ctx, path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("find entry failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if entry.IsDir {
|
||||||
|
return nil, nil, fmt.Errorf("cannot download directory")
|
||||||
|
}
|
||||||
|
|
||||||
|
downloader, err := fs.store.DownloadFile(entry.S3Key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("download file failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return downloader, entry, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FileSystem) DeleteFile(ctx context.Context, path utils.FullPath, isDir bool) error {
|
||||||
|
locker := fs.locker.AcquireLock(string(path))
|
||||||
|
defer fs.locker.ReleaseLock(locker)
|
||||||
|
|
||||||
|
logger.Info("deleting file => %s", path)
|
||||||
|
|
||||||
|
if string(path) == "/" {
|
||||||
|
return fmt.Errorf("cannot delete root")
|
||||||
|
}
|
||||||
|
|
||||||
|
entry, err := fs.FindEntry(ctx, path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("find entry failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !isDir {
|
||||||
|
if err := fs.store.DeleteObject(entry.S3Key); err != nil {
|
||||||
|
return fmt.Errorf("delete file failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := fs.meta.DeleteEntry(ctx, path); err != nil {
|
||||||
|
return fmt.Errorf("delete entry failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FileSystem) RenameFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error {
|
||||||
|
locker := fs.locker.AcquireOrderedLock(string(srcPath), string(dstPath))
|
||||||
|
defer fs.locker.ReleaseLocks(locker)
|
||||||
|
|
||||||
|
logger.Info("moving file %s => %s", srcPath, dstPath)
|
||||||
|
|
||||||
|
if string(srcPath) == "/" || string(dstPath) == "/" {
|
||||||
|
return fmt.Errorf("cannot rename root")
|
||||||
|
}
|
||||||
|
|
||||||
|
srcEntry, err := fs.FindEntry(ctx, srcPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("find src entry failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if dstEntry, _ := fs.FindEntry(ctx, dstPath); dstEntry != nil {
|
||||||
|
return fmt.Errorf("dst entry %s already exists", dstPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
parentDir, _ := dstPath.DirAndName()
|
||||||
|
if parentDir != "/" {
|
||||||
|
if parentEntry, _ := fs.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
|
||||||
|
return fmt.Errorf("parent directory %s does not exist", parentDir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if isDir {
|
||||||
|
return fmt.Errorf("rename not file")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := utils.Move(fs.root, string(srcPath), string(dstPath)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID)
|
||||||
|
if err := fs.meta.UpdateEntry(ctx, newEntry); err != nil {
|
||||||
|
return fmt.Errorf("update entry failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := fs.meta.DeleteEntry(ctx, srcPath); err != nil {
|
||||||
|
return fmt.Errorf("delete src entry failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FileSystem) CopyFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error {
|
||||||
|
locker := fs.locker.AcquireOrderedLock(string(srcPath), string(dstPath))
|
||||||
|
defer fs.locker.ReleaseLocks(locker)
|
||||||
|
|
||||||
|
logger.Info("copying file %s => %s", srcPath, dstPath)
|
||||||
|
|
||||||
|
if string(srcPath) == "/" || string(dstPath) == "/" {
|
||||||
|
return fmt.Errorf("cannot copy root")
|
||||||
|
}
|
||||||
|
|
||||||
|
srcEntry, err := fs.FindEntry(ctx, srcPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("find src entry failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if dstEntry, err := fs.FindEntry(ctx, dstPath); dstEntry != nil && err == nil {
|
||||||
|
return fmt.Errorf("dst entry %s already exists", dstPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
parentDir, _ := dstPath.DirAndName()
|
||||||
|
if parentDir != "/" {
|
||||||
|
if parentEntry, _ := fs.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
|
||||||
|
return fmt.Errorf("parent directory %s does not exist", parentDir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if isDir {
|
||||||
|
return fmt.Errorf("copy not file")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := utils.Clone(fs.root, string(srcPath), string(dstPath)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID)
|
||||||
|
if err := fs.meta.InsertEntry(ctx, newEntry); err != nil {
|
||||||
|
return fmt.Errorf("insert new entry failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *FileSystem) Shutdown() {
|
||||||
|
if fs.meta != nil {
|
||||||
|
fs.meta.Shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package engine
|
package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -10,9 +10,9 @@ import (
|
|||||||
"robotfs/utils"
|
"robotfs/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (f *FileSystemManager) CopyDirectory(ctx context.Context, oldPath, newPath utils.FullPath, isDir bool) error {
|
func (fs *FileSystem) CopyDirectory(ctx context.Context, oldPath, newPath utils.FullPath, isDir bool) error {
|
||||||
locker := f.locker.AcquireOrderedLock(string(oldPath), string(newPath))
|
locker := fs.locker.AcquireOrderedLock(string(oldPath), string(newPath))
|
||||||
defer f.locker.ReleaseLocks(locker)
|
defer fs.locker.ReleaseLocks(locker)
|
||||||
|
|
||||||
if string(oldPath) == "/" {
|
if string(oldPath) == "/" {
|
||||||
return fmt.Errorf("cannot copy root directory")
|
return fmt.Errorf("cannot copy root directory")
|
||||||
@ -22,37 +22,37 @@ func (f *FileSystemManager) CopyDirectory(ctx context.Context, oldPath, newPath
|
|||||||
return fmt.Errorf("cannot copy directory to a subdirectory of itself")
|
return fmt.Errorf("cannot copy directory to a subdirectory of itself")
|
||||||
}
|
}
|
||||||
|
|
||||||
oldEntry, err := f.FindEntry(ctx, oldPath)
|
oldEntry, err := fs.FindEntry(ctx, oldPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s not found: %v", oldPath, err)
|
return fmt.Errorf("%s not found: %v", oldPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if newEntry, err := f.FindEntry(ctx, newPath); newEntry != nil && err == nil {
|
if newEntry, err := fs.FindEntry(ctx, newPath); newEntry != nil && err == nil {
|
||||||
return fmt.Errorf("destination %s already exists", newPath)
|
return fmt.Errorf("destination %s already exists", newPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
parentDir, _ := newPath.DirAndName()
|
parentDir, _ := newPath.DirAndName()
|
||||||
if parentDir != "/" {
|
if parentDir != "/" {
|
||||||
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
|
if parentEntry, _ := fs.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
|
||||||
return fmt.Errorf("parent directory %s does not exist", parentDir)
|
return fmt.Errorf("parent directory %s does not exist", parentDir)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := f.copyEntry(ctx, oldPath, oldEntry, newPath); err != nil {
|
if err := fs.copyEntry(ctx, oldPath, oldEntry, newPath); err != nil {
|
||||||
return fmt.Errorf("copy metadata failed: %v", err)
|
return fmt.Errorf("copy metadata failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := utils.Clone(f.root, string(oldPath), string(newPath)); err != nil {
|
if err := utils.Clone(fs.root, string(oldPath), string(newPath)); err != nil {
|
||||||
return fmt.Errorf("copy file data failed: %v", err)
|
return fmt.Errorf("copy file data failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileSystemManager) copyEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath) error {
|
func (fs *FileSystem) copyEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath) error {
|
||||||
if err := f.copySelfEntry(ctx, oldPath, entry, newPath, func() error {
|
if err := fs.copySelfEntry(ctx, oldPath, entry, newPath, func() error {
|
||||||
if entry.IsDir {
|
if entry.IsDir {
|
||||||
if err := f.copyFolderSubEntries(ctx, oldPath, newPath); err != nil {
|
if err := fs.copyFolderSubEntries(ctx, oldPath, newPath); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -64,14 +64,14 @@ func (f *FileSystemManager) copyEntry(ctx context.Context, oldPath utils.FullPat
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileSystemManager) copyFolderSubEntries(ctx context.Context, oldPath utils.FullPath, newPath utils.FullPath) error {
|
func (fs *FileSystem) copyFolderSubEntries(ctx context.Context, oldPath utils.FullPath, newPath utils.FullPath) error {
|
||||||
logger.Info("copying folder %s => %s", oldPath, newPath)
|
logger.Info("copying folder %s => %s", oldPath, newPath)
|
||||||
|
|
||||||
lastFileName := ""
|
lastFileName := ""
|
||||||
includeLastFile := false
|
includeLastFile := false
|
||||||
for {
|
for {
|
||||||
entries := make([]*utils.Entry, 0, 1000)
|
entries := make([]*utils.Entry, 0, 1000)
|
||||||
lastFileName, err := f.doListDirectoryEntries(ctx, oldPath, lastFileName, includeLastFile, 1000, func(entry *utils.Entry) bool {
|
lastFileName, err := fs.doListDirectoryEntries(ctx, oldPath, lastFileName, includeLastFile, 1000, func(entry *utils.Entry) bool {
|
||||||
entries = append(entries, entry)
|
entries = append(entries, entry)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
@ -86,7 +86,7 @@ func (f *FileSystemManager) copyFolderSubEntries(ctx context.Context, oldPath ut
|
|||||||
for _, item := range entries {
|
for _, item := range entries {
|
||||||
itemOldPath := oldPath.Child(item.FullPath.Name())
|
itemOldPath := oldPath.Child(item.FullPath.Name())
|
||||||
itemNewPath := newPath.Child(item.FullPath.Name())
|
itemNewPath := newPath.Child(item.FullPath.Name())
|
||||||
err := f.copyEntry(ctx, itemOldPath, item, itemNewPath)
|
err := fs.copyEntry(ctx, itemOldPath, item, itemNewPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -100,7 +100,7 @@ func (f *FileSystemManager) copyFolderSubEntries(ctx context.Context, oldPath ut
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileSystemManager) copySelfEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath, copyFolderSubEntries func() error) error {
|
func (fs *FileSystem) copySelfEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath, copyFolderSubEntries func() error) error {
|
||||||
logger.Info("copying entry %s => %s", oldPath, newPath)
|
logger.Info("copying entry %s => %s", oldPath, newPath)
|
||||||
|
|
||||||
if oldPath == newPath {
|
if oldPath == newPath {
|
||||||
@ -121,7 +121,7 @@ func (f *FileSystemManager) copySelfEntry(ctx context.Context, oldPath utils.Ful
|
|||||||
Extended: entry.Extended,
|
Extended: entry.Extended,
|
||||||
}
|
}
|
||||||
|
|
||||||
if createErr := f.meta.InsertEntry(ctx, newEntry); createErr != nil {
|
if createErr := fs.meta.InsertEntry(ctx, newEntry); createErr != nil {
|
||||||
return createErr
|
return createErr
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package engine
|
package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -8,59 +8,59 @@ import (
|
|||||||
"robotfs/utils"
|
"robotfs/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (f *FileSystemManager) DeleteDirectory(ctx context.Context, path utils.FullPath, isRecursive, ignoreRecursiveError bool) error {
|
func (fs *FileSystem) DeleteDirectory(ctx context.Context, path utils.FullPath, isRecursive, ignoreRecursiveError bool) error {
|
||||||
locker := f.locker.AcquireLock(string(path))
|
locker := fs.locker.AcquireLock(string(path))
|
||||||
defer f.locker.ReleaseLock(locker)
|
defer fs.locker.ReleaseLock(locker)
|
||||||
|
|
||||||
if string(path) == "/" {
|
if string(path) == "/" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
entry, err := f.FindEntry(ctx, path)
|
entry, err := fs.FindEntry(ctx, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if entry.IsDir {
|
if entry.IsDir {
|
||||||
err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError)
|
err = fs.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete the directory entry itself
|
// Delete the directory entry itself
|
||||||
err = f.doDeleteEntryMetaAndData(ctx, entry)
|
err = fs.doDeleteEntryMetaAndData(ctx, entry)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := utils.Remove(f.root, string(path), isRecursive); err != nil {
|
if err := utils.Remove(fs.root, string(path), isRecursive); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileSystemManager) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *utils.Entry, isRecursive, ignoreRecursiveError bool) error {
|
func (fs *FileSystem) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *utils.Entry, isRecursive, ignoreRecursiveError bool) error {
|
||||||
logger.Info("deleting folder => %s", entry.FullPath)
|
logger.Info("deleting folder => %s", entry.FullPath)
|
||||||
|
|
||||||
lastFileName := ""
|
lastFileName := ""
|
||||||
includeLastFile := false
|
includeLastFile := false
|
||||||
|
|
||||||
for {
|
for {
|
||||||
lastFileName, err := f.doListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, 1000, func(sub *utils.Entry) bool {
|
lastFileName, err := fs.doListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, 1000, func(sub *utils.Entry) bool {
|
||||||
if sub.IsDir {
|
if sub.IsDir {
|
||||||
// Recursively delete subdirectory
|
// Recursively delete subdirectory
|
||||||
err := f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError)
|
err := fs.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = f.doDeleteEntryMetaAndData(ctx, sub)
|
err = fs.doDeleteEntryMetaAndData(ctx, sub)
|
||||||
}
|
}
|
||||||
if err != nil && !ignoreRecursiveError {
|
if err != nil && !ignoreRecursiveError {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Delete file entry
|
// Delete file entry
|
||||||
err := f.doDeleteEntryMetaAndData(ctx, sub)
|
err := fs.doDeleteEntryMetaAndData(ctx, sub)
|
||||||
if err != nil && !ignoreRecursiveError {
|
if err != nil && !ignoreRecursiveError {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -74,7 +74,7 @@ func (f *FileSystemManager) doBatchDeleteFolderMetaAndData(ctx context.Context,
|
|||||||
if lastFileName == "" && !isRecursive {
|
if lastFileName == "" && !isRecursive {
|
||||||
// Check if there are any entries
|
// Check if there are any entries
|
||||||
hasEntries := false
|
hasEntries := false
|
||||||
_, err := f.doListDirectoryEntries(ctx, entry.FullPath, "", false, 1, func(_ *utils.Entry) bool {
|
_, err := fs.doListDirectoryEntries(ctx, entry.FullPath, "", false, 1, func(_ *utils.Entry) bool {
|
||||||
hasEntries = true
|
hasEntries = true
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
@ -91,16 +91,16 @@ func (f *FileSystemManager) doBatchDeleteFolderMetaAndData(ctx context.Context,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if storeDeletionErr := f.meta.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
|
if storeDeletionErr := fs.meta.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
|
||||||
return fmt.Errorf("delete folder children metadata: %v", storeDeletionErr)
|
return fmt.Errorf("delete folder children metadata: %v", storeDeletionErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileSystemManager) doDeleteEntryMetaAndData(ctx context.Context, entry *utils.Entry) error {
|
func (fs *FileSystem) doDeleteEntryMetaAndData(ctx context.Context, entry *utils.Entry) error {
|
||||||
logger.Info("deleting entry => %s", entry.FullPath)
|
logger.Info("deleting entry => %s", entry.FullPath)
|
||||||
if storeDeletionErr := f.meta.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
|
if storeDeletionErr := fs.meta.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
|
||||||
return fmt.Errorf("delete entry metadata: %v", storeDeletionErr)
|
return fmt.Errorf("delete entry metadata: %v", storeDeletionErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package engine
|
package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -10,32 +10,32 @@ import (
|
|||||||
"robotfs/utils"
|
"robotfs/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (f *FileSystemManager) RenameDirectory(ctx context.Context, oldPath, newPath utils.FullPath, isDir bool) error {
|
func (fs *FileSystem) RenameDirectory(ctx context.Context, oldPath, newPath utils.FullPath, isDir bool) error {
|
||||||
locker := f.locker.AcquireOrderedLock(string(oldPath), string(newPath))
|
locker := fs.locker.AcquireOrderedLock(string(oldPath), string(newPath))
|
||||||
defer f.locker.ReleaseLocks(locker)
|
defer fs.locker.ReleaseLocks(locker)
|
||||||
|
|
||||||
if err := f.canRename(oldPath, newPath, isDir); err != nil {
|
if err := fs.canRename(oldPath, newPath, isDir); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
oldEntry, err := f.FindEntry(ctx, oldPath)
|
oldEntry, err := fs.FindEntry(ctx, oldPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s not found: %v", oldPath, err)
|
return fmt.Errorf("%s not found: %v", oldPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
moveErr := f.moveEntry(ctx, oldPath, oldEntry, newPath)
|
moveErr := fs.moveEntry(ctx, oldPath, oldEntry, newPath)
|
||||||
if moveErr != nil {
|
if moveErr != nil {
|
||||||
return fmt.Errorf("%s move error: %v", oldPath, moveErr)
|
return fmt.Errorf("%s move error: %v", oldPath, moveErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := utils.Move(f.root, string(oldPath), string(newPath)); err != nil {
|
if err := utils.Move(fs.root, string(oldPath), string(newPath)); err != nil {
|
||||||
return fmt.Errorf("move file data failed: %v", err)
|
return fmt.Errorf("move file data failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileSystemManager) canRename(source, target utils.FullPath, isDir bool) error {
|
func (fs *FileSystem) canRename(source, target utils.FullPath, isDir bool) error {
|
||||||
if string(source) == "/" {
|
if string(source) == "/" {
|
||||||
return fmt.Errorf("mv: cannot move root directory")
|
return fmt.Errorf("mv: cannot move root directory")
|
||||||
}
|
}
|
||||||
@ -47,10 +47,10 @@ func (f *FileSystemManager) canRename(source, target utils.FullPath, isDir bool)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileSystemManager) moveEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath) error {
|
func (fs *FileSystem) moveEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath) error {
|
||||||
if err := f.moveSelfEntry(ctx, oldPath, entry, newPath, func() error {
|
if err := fs.moveSelfEntry(ctx, oldPath, entry, newPath, func() error {
|
||||||
if entry.IsDir {
|
if entry.IsDir {
|
||||||
if err := f.moveFolderSubEntries(ctx, oldPath, newPath); err != nil {
|
if err := fs.moveFolderSubEntries(ctx, oldPath, newPath); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -62,14 +62,14 @@ func (f *FileSystemManager) moveEntry(ctx context.Context, oldPath utils.FullPat
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileSystemManager) moveFolderSubEntries(ctx context.Context, oldPath utils.FullPath, newPath utils.FullPath) error {
|
func (fs *FileSystem) moveFolderSubEntries(ctx context.Context, oldPath utils.FullPath, newPath utils.FullPath) error {
|
||||||
logger.Info("moving folder %s => %s", oldPath, newPath)
|
logger.Info("moving folder %s => %s", oldPath, newPath)
|
||||||
|
|
||||||
lastFileName := ""
|
lastFileName := ""
|
||||||
includeLastFile := false
|
includeLastFile := false
|
||||||
for {
|
for {
|
||||||
entries := make([]*utils.Entry, 0, 1000)
|
entries := make([]*utils.Entry, 0, 1000)
|
||||||
lastFileName, err := f.doListDirectoryEntries(ctx, oldPath, lastFileName, includeLastFile, 1000, func(entry *utils.Entry) bool {
|
lastFileName, err := fs.doListDirectoryEntries(ctx, oldPath, lastFileName, includeLastFile, 1000, func(entry *utils.Entry) bool {
|
||||||
entries = append(entries, entry)
|
entries = append(entries, entry)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
@ -84,7 +84,7 @@ func (f *FileSystemManager) moveFolderSubEntries(ctx context.Context, oldPath ut
|
|||||||
for _, item := range entries {
|
for _, item := range entries {
|
||||||
itemOldPath := oldPath.Child(item.FullPath.Name())
|
itemOldPath := oldPath.Child(item.FullPath.Name())
|
||||||
itemNewPath := newPath.Child(item.FullPath.Name())
|
itemNewPath := newPath.Child(item.FullPath.Name())
|
||||||
err := f.moveEntry(ctx, itemOldPath, item, itemNewPath)
|
err := fs.moveEntry(ctx, itemOldPath, item, itemNewPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -98,7 +98,7 @@ func (f *FileSystemManager) moveFolderSubEntries(ctx context.Context, oldPath ut
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileSystemManager) moveSelfEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath, moveFolderSubEntries func() error) error {
|
func (fs *FileSystem) moveSelfEntry(ctx context.Context, oldPath utils.FullPath, entry *utils.Entry, newPath utils.FullPath, moveFolderSubEntries func() error) error {
|
||||||
logger.Info("moving entry %s => %s", oldPath, newPath)
|
logger.Info("moving entry %s => %s", oldPath, newPath)
|
||||||
|
|
||||||
if oldPath == newPath {
|
if oldPath == newPath {
|
||||||
@ -119,7 +119,7 @@ func (f *FileSystemManager) moveSelfEntry(ctx context.Context, oldPath utils.Ful
|
|||||||
Extended: entry.Extended,
|
Extended: entry.Extended,
|
||||||
}
|
}
|
||||||
|
|
||||||
if updateErr := f.meta.UpdateEntry(ctx, newEntry); updateErr != nil {
|
if updateErr := fs.meta.UpdateEntry(ctx, newEntry); updateErr != nil {
|
||||||
return updateErr
|
return updateErr
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -129,7 +129,7 @@ func (f *FileSystemManager) moveSelfEntry(ctx context.Context, oldPath utils.Ful
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
deleteErr := f.meta.DeleteEntry(ctx, oldPath)
|
deleteErr := fs.meta.DeleteEntry(ctx, oldPath)
|
||||||
if deleteErr != nil {
|
if deleteErr != nil {
|
||||||
return deleteErr
|
return deleteErr
|
||||||
}
|
}
|
||||||
@ -34,6 +34,8 @@ func (store *RedisLuaStore) initialize(hostPort string, username, password strin
|
|||||||
Username: username,
|
Username: username,
|
||||||
Password: password,
|
Password: password,
|
||||||
DB: database,
|
DB: database,
|
||||||
|
PoolSize: 100,
|
||||||
|
MinIdleConns: 20,
|
||||||
})
|
})
|
||||||
store.loadSuperLargeDirectories(superLargeDirectories)
|
store.loadSuperLargeDirectories(superLargeDirectories)
|
||||||
return
|
return
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user