robotfs/engine/filesystem_manager.go
2025-05-19 01:21:29 +08:00

159 lines
4.0 KiB
Go

package engine
import (
"context"
"fmt"
"io"
"sync"
"time"
"robotfs/store"
"robotfs/utils"
)
type FileSystemManager struct {
sync.RWMutex
meta store.MetaStore
storage *StorageManager
}
func NewFileSystemManager(meta store.MetaStore, storage *StorageManager) *FileSystemManager {
return &FileSystemManager{
meta: meta,
storage: storage,
}
}
func (f *FileSystemManager) BeginTransaction(ctx context.Context) (context.Context, error) {
return f.meta.BeginTransaction(ctx)
}
func (f *FileSystemManager) CommitTransaction(ctx context.Context) error {
return f.meta.CommitTransaction(ctx)
}
func (f *FileSystemManager) RollbackTransaction(ctx context.Context) error {
return f.meta.RollbackTransaction(ctx)
}
var (
Root = &utils.Entry{
FullPath: utils.FullPath("/"),
IsDir: true,
CreateTime: time.Now().Unix(),
LastModificationTime: time.Now().Unix(),
}
)
func (f *FileSystemManager) FindEntry(ctx context.Context, p utils.FullPath) (entry *utils.Entry, err error) {
f.RLock()
defer f.RUnlock()
if p == "/" {
return Root, nil
}
entry, err = f.meta.FindEntry(ctx, p)
if err != nil {
return nil, err
}
return
}
func (f *FileSystemManager) MakeDirectory(ctx context.Context, path utils.FullPath) error {
f.Lock()
defer f.Unlock()
if string(path) == "/" {
return nil
}
if entry, _ := f.FindEntry(ctx, path); entry != nil {
return fmt.Errorf("directory %s already exists", path)
}
parentDir, _ := path.DirAndName()
if parentDir != "/" {
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
return fmt.Errorf("parent directory %s does not exist", parentDir)
}
}
entry := utils.NewDirEntry(path)
return f.meta.InsertEntry(ctx, entry)
}
func (f *FileSystemManager) ListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64) (entries []*utils.Entry, hasMore bool, err error) {
f.RLock()
defer f.RUnlock()
_, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, func(entry *utils.Entry) bool {
entries = append(entries, entry)
return true
})
hasMore = int64(len(entries)) >= limit+1
if hasMore {
entries = entries[:limit]
}
return entries, hasMore, err
}
func (f *FileSystemManager) StreamListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc store.ListEachEntryFunc) (lastFileName string, err error) {
lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, eachEntryFunc)
return
}
func (f *FileSystemManager) doListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc store.ListEachEntryFunc) (lastFileName string, err error) {
lastFileName, err = f.meta.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool {
select {
case <-ctx.Done():
return false
default:
return eachEntryFunc(entry)
}
})
return
}
func (f *FileSystemManager) CreateFile(ctx context.Context, path utils.FullPath, reader io.Reader, contentType string) error {
f.Lock()
defer f.Unlock()
if string(path) == "/" {
return fmt.Errorf("cannot create file %s", path)
}
if entry, _ := f.FindEntry(ctx, path); entry != nil {
return fmt.Errorf("file %s already exists", path)
}
parentDir, _ := path.DirAndName()
if parentDir != "/" {
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
return fmt.Errorf("parent directory %s does not exist", parentDir)
}
}
output, size, err := f.storage.UploadFile(path.ToS3Key(), reader, contentType)
if err != nil {
return fmt.Errorf("upload s3 failed: %v", err)
}
entry := utils.NewFileEntry(path, path.ToS3Key(), uint64(size), contentType, *output.ETag, *output.VersionID)
if err := f.meta.InsertEntry(ctx, entry); err != nil {
return fmt.Errorf("create file entry failed: %v", err)
}
return nil
}
func (f *FileSystemManager) Shutdown() {
if f.meta != nil {
f.meta.Shutdown()
}
}