robotfs/engine/filesystem_manager.go
2025-06-03 20:17:40 +08:00

319 lines
8.6 KiB
Go

package engine
import (
"context"
"fmt"
"gosvc/logger"
"io"
"time"
"robotfs/pkg/meta"
"robotfs/pkg/storage"
"robotfs/utils"
)
type FileSystemManager struct {
root string
meta meta.MetaStore
store *storage.Storage
locker *utils.LockManager
}
func NewFileSystemManager(meta meta.MetaStore, store *storage.Storage, root string) *FileSystemManager {
return &FileSystemManager{
root: root,
meta: meta,
store: store,
locker: utils.NewLockManager(),
}
}
func (f *FileSystemManager) BeginTransaction(ctx context.Context) (context.Context, error) {
return f.meta.BeginTransaction(ctx)
}
func (f *FileSystemManager) CommitTransaction(ctx context.Context) error {
return f.meta.CommitTransaction(ctx)
}
func (f *FileSystemManager) RollbackTransaction(ctx context.Context) error {
return f.meta.RollbackTransaction(ctx)
}
var (
Root = &utils.Entry{
FullPath: utils.FullPath("/"),
IsDir: true,
CreateTime: time.Now().Unix(),
LastModificationTime: time.Now().Unix(),
}
)
func (f *FileSystemManager) FindEntry(ctx context.Context, p utils.FullPath) (entry *utils.Entry, err error) {
if p == "/" {
return Root, nil
}
entry, err = f.meta.FindEntry(ctx, p)
if err != nil {
return nil, err
}
return
}
func (f *FileSystemManager) MakeDirectory(ctx context.Context, path utils.FullPath) error {
locker := f.locker.AcquireLock(string(path))
defer f.locker.ReleaseLock(locker)
logger.Info("making directory => %s", path)
if string(path) == "/" {
return nil
}
if entry, err := f.FindEntry(ctx, path); err == nil && entry != nil {
return fmt.Errorf("directory %s already exists", path)
}
parentDir, _ := path.DirAndName()
if parentDir != "/" {
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
return fmt.Errorf("parent directory %s does not exist", parentDir)
}
}
if err := utils.Mkdir(f.root, string(path)); err != nil {
return err
}
entry := utils.NewDirEntry(path)
return f.meta.InsertEntry(ctx, entry)
}
func (f *FileSystemManager) ListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64) (entries []*utils.Entry, hasMore bool, lastFileName string, err error) {
locker := f.locker.AcquireRLock(string(p))
defer f.locker.ReleaseLock(locker)
logger.Info("listing directory %s => startFileName:%s inclusive:%t count:%d", p, startFileName, inclusive, limit)
lastFileName, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool {
entries = append(entries, entry)
return true
})
hasMore = int64(len(entries)) >= limit
if hasMore {
entries = entries[:limit]
}
return entries, hasMore, lastFileName, err
}
func (f *FileSystemManager) StreamListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) {
lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, eachEntryFunc)
return
}
func (f *FileSystemManager) doListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) {
lastFileName, err = f.meta.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool {
select {
case <-ctx.Done():
return false
default:
return eachEntryFunc(entry)
}
})
return
}
func (f *FileSystemManager) CreateFile(ctx context.Context, path utils.FullPath, reader io.Reader, contentType string, fileSize int64) error {
locker := f.locker.AcquireLock(string(path))
defer f.locker.ReleaseLock(locker)
logger.Info("uploading file => %s", path)
if string(path) == "/" {
return fmt.Errorf("cannot create file %s", path)
}
if entry, _ := f.FindEntry(ctx, path); entry != nil {
return fmt.Errorf("file %s already exists", path)
}
parentDir, _ := path.DirAndName()
if parentDir != "/" {
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
return fmt.Errorf("parent directory %s does not exist", parentDir)
}
}
output, err := f.store.UploadFile(path.ToS3Key(), reader, contentType)
if err != nil {
return fmt.Errorf("upload file failed: %v", err)
}
if output.ETag == nil {
empty := ""
output.ETag = &empty
}
if output.VersionID == nil {
empty := ""
output.VersionID = &empty
}
entry := utils.NewFileEntry(path, path.ToS3Key(), uint64(fileSize), contentType, *output.ETag, *output.VersionID)
if err := f.meta.InsertEntry(ctx, entry); err != nil {
return fmt.Errorf("create file entry failed: %v", err)
}
return nil
}
func (f *FileSystemManager) DownloadFile(ctx context.Context, path utils.FullPath) (*utils.S3ReadSeeker, *utils.Entry, error) {
locker := f.locker.AcquireRLock(string(path))
defer f.locker.ReleaseLock(locker)
logger.Info("downloading file => %s", path)
entry, err := f.FindEntry(ctx, path)
if err != nil {
return nil, nil, fmt.Errorf("find entry failed: %v", err)
}
if entry.IsDir {
return nil, nil, fmt.Errorf("cannot download directory")
}
downloader, err := f.store.DownloadFile(entry.S3Key)
if err != nil {
return nil, nil, fmt.Errorf("download file failed: %v", err)
}
return downloader, entry, nil
}
func (f *FileSystemManager) DeleteFile(ctx context.Context, path utils.FullPath, isDir bool) error {
locker := f.locker.AcquireLock(string(path))
defer f.locker.ReleaseLock(locker)
logger.Info("deleting file => %s", path)
if string(path) == "/" {
return fmt.Errorf("cannot delete root")
}
entry, err := f.FindEntry(ctx, path)
if err != nil {
return fmt.Errorf("find entry failed: %v", err)
}
if !isDir {
if err := f.store.DeleteObject(entry.S3Key); err != nil {
return fmt.Errorf("delete file failed: %v", err)
}
}
if err := f.meta.DeleteEntry(ctx, path); err != nil {
return fmt.Errorf("delete entry failed: %v", err)
}
return nil
}
func (f *FileSystemManager) RenameFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error {
locker := f.locker.AcquireOrderedLock(string(srcPath), string(dstPath))
defer f.locker.ReleaseLocks(locker)
logger.Info("moving file %s => %s", srcPath, dstPath)
if string(srcPath) == "/" || string(dstPath) == "/" {
return fmt.Errorf("cannot rename root")
}
srcEntry, err := f.FindEntry(ctx, srcPath)
if err != nil {
return fmt.Errorf("find src entry failed: %v", err)
}
if dstEntry, _ := f.FindEntry(ctx, dstPath); dstEntry != nil {
return fmt.Errorf("dst entry %s already exists", dstPath)
}
parentDir, _ := dstPath.DirAndName()
if parentDir != "/" {
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
return fmt.Errorf("parent directory %s does not exist", parentDir)
}
}
if isDir {
return fmt.Errorf("rename not file")
}
if err := utils.Move(f.root, string(srcPath), string(dstPath)); err != nil {
return err
}
newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID)
if err := f.meta.UpdateEntry(ctx, newEntry); err != nil {
return fmt.Errorf("update entry failed: %v", err)
}
if err := f.meta.DeleteEntry(ctx, srcPath); err != nil {
return fmt.Errorf("delete src entry failed: %v", err)
}
return nil
}
func (f *FileSystemManager) CopyFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error {
locker := f.locker.AcquireOrderedLock(string(srcPath), string(dstPath))
defer f.locker.ReleaseLocks(locker)
logger.Info("copying file %s => %s", srcPath, dstPath)
if string(srcPath) == "/" || string(dstPath) == "/" {
return fmt.Errorf("cannot copy root")
}
srcEntry, err := f.FindEntry(ctx, srcPath)
if err != nil {
return fmt.Errorf("find src entry failed: %v", err)
}
if dstEntry, err := f.FindEntry(ctx, dstPath); dstEntry != nil && err == nil {
return fmt.Errorf("dst entry %s already exists", dstPath)
}
parentDir, _ := dstPath.DirAndName()
if parentDir != "/" {
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
return fmt.Errorf("parent directory %s does not exist", parentDir)
}
}
if isDir {
return fmt.Errorf("copy not file")
}
if err := utils.Clone(f.root, string(srcPath), string(dstPath)); err != nil {
return err
}
newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID)
if err := f.meta.InsertEntry(ctx, newEntry); err != nil {
return fmt.Errorf("insert new entry failed: %v", err)
}
return nil
}
func (f *FileSystemManager) Shutdown() {
if f.meta != nil {
f.meta.Shutdown()
}
}