package engine import ( "context" "fmt" "io" "sync" "time" "robotfs/store" "robotfs/utils" ) type FileSystemManager struct { sync.RWMutex meta store.MetaStore storage *StorageManager root string } func NewFileSystemManager(meta store.MetaStore, storage *StorageManager, root string) *FileSystemManager { return &FileSystemManager{ meta: meta, storage: storage, root: root, } } func (f *FileSystemManager) BeginTransaction(ctx context.Context) (context.Context, error) { return f.meta.BeginTransaction(ctx) } func (f *FileSystemManager) CommitTransaction(ctx context.Context) error { return f.meta.CommitTransaction(ctx) } func (f *FileSystemManager) RollbackTransaction(ctx context.Context) error { return f.meta.RollbackTransaction(ctx) } var ( Root = &utils.Entry{ FullPath: utils.FullPath("/"), IsDir: true, CreateTime: time.Now().Unix(), LastModificationTime: time.Now().Unix(), } ) func (f *FileSystemManager) FindEntry(ctx context.Context, p utils.FullPath) (entry *utils.Entry, err error) { if p == "/" { return Root, nil } entry, err = f.meta.FindEntry(ctx, p) if err != nil { return nil, err } return } func (f *FileSystemManager) MakeDirectory(ctx context.Context, path utils.FullPath) error { f.Lock() defer f.Unlock() if string(path) == "/" { return nil } if entry, _ := f.FindEntry(ctx, path); entry != nil { return fmt.Errorf("directory %s already exists", path) } parentDir, _ := path.DirAndName() if parentDir != "/" { if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { return fmt.Errorf("parent directory %s does not exist", parentDir) } } entry := utils.NewDirEntry(path) return f.meta.InsertEntry(ctx, entry) } func (f *FileSystemManager) ListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64) (entries []*utils.Entry, hasMore bool, err error) { f.RLock() defer f.RUnlock() _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, func(entry *utils.Entry) bool { entries = append(entries, entry) return true }) hasMore = int64(len(entries)) >= limit+1 if hasMore { entries = entries[:limit] } return entries, hasMore, err } func (f *FileSystemManager) StreamListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc store.ListEachEntryFunc) (lastFileName string, err error) { lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, eachEntryFunc) return } func (f *FileSystemManager) doListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc store.ListEachEntryFunc) (lastFileName string, err error) { lastFileName, err = f.meta.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool { select { case <-ctx.Done(): return false default: return eachEntryFunc(entry) } }) return } func (f *FileSystemManager) CreateFile(ctx context.Context, path utils.FullPath, reader io.Reader, contentType string, fileSize int64) error { f.Lock() defer f.Unlock() if string(path) == "/" { return fmt.Errorf("cannot create file %s", path) } if entry, _ := f.FindEntry(ctx, path); entry != nil { return fmt.Errorf("file %s already exists", path) } parentDir, _ := path.DirAndName() if parentDir != "/" { if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { return fmt.Errorf("parent directory %s does not exist", parentDir) } } output, err := f.storage.UploadFile(path.ToS3Key(), reader, contentType) if err != nil { return fmt.Errorf("upload s3 failed: %v", err) } if output.ETag == nil { empty := "" output.ETag = &empty } if output.VersionID == nil { empty := "" output.VersionID = &empty } entry := utils.NewFileEntry(path, path.ToS3Key(), uint64(fileSize), contentType, *output.ETag, *output.VersionID) if err := f.meta.InsertEntry(ctx, entry); err != nil { return fmt.Errorf("create file entry failed: %v", err) } return nil } func (f *FileSystemManager) DownloadFile(ctx context.Context, path utils.FullPath) (*utils.S3ReadSeeker, *utils.Entry, error) { f.RLock() defer f.RUnlock() entry, err := f.FindEntry(ctx, path) if err != nil { return nil, nil, fmt.Errorf("find entry failed: %v", err) } if entry.IsDir { return nil, nil, fmt.Errorf("cannot download directory") } downloader, err := f.storage.DownloadFile(entry.S3Key) if err != nil { return nil, nil, fmt.Errorf("read s3 object failed: %v", err) } return downloader, entry, nil } func (f *FileSystemManager) DeleteFile(ctx context.Context, path utils.FullPath, isDir bool) error { f.Lock() defer f.Unlock() if string(path) == "/" { return fmt.Errorf("cannot delete root") } entry, err := f.FindEntry(ctx, path) if err != nil { return fmt.Errorf("find entry failed: %v", err) } if !isDir { if err := f.storage.DeleteObject(entry.S3Key); err != nil { return fmt.Errorf("delete s3 object failed: %v", err) } } if err := f.meta.DeleteEntry(ctx, path); err != nil { return fmt.Errorf("delete entry failed: %v", err) } return nil } func (f *FileSystemManager) RenameFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error { f.Lock() defer f.Unlock() if string(srcPath) == "/" || string(dstPath) == "/" { return fmt.Errorf("cannot rename root") } srcEntry, err := f.FindEntry(ctx, srcPath) if err != nil { return fmt.Errorf("find src file failed: %v", err) } if dstEntry, _ := f.FindEntry(ctx, dstPath); dstEntry != nil { return fmt.Errorf("dst file %s already exists", dstPath) } parentDir, _ := dstPath.DirAndName() if parentDir != "/" { if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { return fmt.Errorf("parent directory %s does not exist", parentDir) } } if isDir { return fmt.Errorf("rename not file") } if err := utils.Move(f.root, string(srcPath), string(dstPath)); err != nil { return err } newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID) if err := f.meta.InsertEntry(ctx, newEntry); err != nil { return fmt.Errorf("insert new entry failed: %v", err) } if err := f.meta.DeleteEntry(ctx, srcPath); err != nil { return fmt.Errorf("delete source entry failed: %v", err) } return nil } func (f *FileSystemManager) CopyFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error { f.Lock() defer f.Unlock() if string(srcPath) == "/" || string(dstPath) == "/" { return fmt.Errorf("cannot copy root") } srcEntry, err := f.FindEntry(ctx, srcPath) if err != nil { return fmt.Errorf("find src file failed: %v", err) } if dstEntry, _ := f.FindEntry(ctx, dstPath); dstEntry != nil { return fmt.Errorf("dst file %s already exists", dstPath) } parentDir, _ := dstPath.DirAndName() if parentDir != "/" { if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { return fmt.Errorf("parent directory %s does not exist", parentDir) } } if isDir { return fmt.Errorf("copy not file") } if err := utils.Clone(f.root, string(srcPath), string(dstPath)); err != nil { return err } newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID) if err := f.meta.InsertEntry(ctx, newEntry); err != nil { return fmt.Errorf("insert new entry failed: %v", err) } return nil } func (f *FileSystemManager) Shutdown() { if f.meta != nil { f.meta.Shutdown() } }