396 lines
10 KiB
Go
396 lines
10 KiB
Go
package engine
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"robotfs/store"
|
|
"robotfs/utils"
|
|
)
|
|
|
|
type FileSystemManager struct {
|
|
sync.RWMutex
|
|
meta store.MetaStore
|
|
storage *StorageManager
|
|
root string
|
|
}
|
|
|
|
func NewFileSystemManager(meta store.MetaStore, storage *StorageManager, root string) *FileSystemManager {
|
|
return &FileSystemManager{
|
|
meta: meta,
|
|
storage: storage,
|
|
root: root,
|
|
}
|
|
}
|
|
|
|
func (f *FileSystemManager) BeginTransaction(ctx context.Context) (context.Context, error) {
|
|
return f.meta.BeginTransaction(ctx)
|
|
}
|
|
|
|
func (f *FileSystemManager) CommitTransaction(ctx context.Context) error {
|
|
return f.meta.CommitTransaction(ctx)
|
|
}
|
|
|
|
func (f *FileSystemManager) RollbackTransaction(ctx context.Context) error {
|
|
return f.meta.RollbackTransaction(ctx)
|
|
}
|
|
|
|
var (
|
|
Root = &utils.Entry{
|
|
FullPath: utils.FullPath("/"),
|
|
IsDir: true,
|
|
CreateTime: time.Now().Unix(),
|
|
LastModificationTime: time.Now().Unix(),
|
|
}
|
|
)
|
|
|
|
func (f *FileSystemManager) FindEntry(ctx context.Context, p utils.FullPath) (entry *utils.Entry, err error) {
|
|
if p == "/" {
|
|
return Root, nil
|
|
}
|
|
|
|
entry, err = f.meta.FindEntry(ctx, p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (f *FileSystemManager) MakeDirectory(ctx context.Context, path utils.FullPath) error {
|
|
f.Lock()
|
|
defer f.Unlock()
|
|
|
|
if string(path) == "/" {
|
|
return nil
|
|
}
|
|
|
|
if entry, _ := f.FindEntry(ctx, path); entry != nil {
|
|
return fmt.Errorf("directory %s already exists", path)
|
|
}
|
|
|
|
parentDir, _ := path.DirAndName()
|
|
if parentDir != "/" {
|
|
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
|
|
return fmt.Errorf("parent directory %s does not exist", parentDir)
|
|
}
|
|
}
|
|
|
|
entry := utils.NewDirEntry(path)
|
|
return f.meta.InsertEntry(ctx, entry)
|
|
}
|
|
|
|
func (f *FileSystemManager) DeleteDirectory(ctx context.Context, path utils.FullPath, isRecursive, ignoreRecursiveError bool) error {
|
|
f.Lock()
|
|
defer f.Unlock()
|
|
|
|
if string(path) == "/" {
|
|
return nil
|
|
}
|
|
|
|
entry, err := f.FindEntry(ctx, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if entry.IsDir {
|
|
err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Delete the directory entry itself
|
|
err = f.doDeleteEntryMetaAndData(ctx, entry)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := utils.Remove(f.root, string(path), isRecursive); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (f *FileSystemManager) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *utils.Entry, isRecursive, ignoreRecursiveError bool) error {
|
|
lastFileName := ""
|
|
includeLastFile := false
|
|
|
|
for {
|
|
lastFileName, err := f.doListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, 1000, func(sub *utils.Entry) bool {
|
|
if sub.IsDir {
|
|
// Recursively delete subdirectory
|
|
err := f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError)
|
|
if err == nil {
|
|
err = f.doDeleteEntryMetaAndData(ctx, sub)
|
|
}
|
|
if err != nil && !ignoreRecursiveError {
|
|
return false
|
|
}
|
|
} else {
|
|
// Delete file entry
|
|
err := f.doDeleteEntryMetaAndData(ctx, sub)
|
|
if err != nil && !ignoreRecursiveError {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("list folder %s: %v", entry.FullPath, err)
|
|
}
|
|
|
|
if lastFileName == "" && !isRecursive {
|
|
// Check if there are any entries
|
|
hasEntries := false
|
|
_, err := f.doListDirectoryEntries(ctx, entry.FullPath, "", false, 1, func(_ *utils.Entry) bool {
|
|
hasEntries = true
|
|
return false
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("check directory empty: %v", err)
|
|
}
|
|
if hasEntries {
|
|
return fmt.Errorf("directory not empty: %s", entry.FullPath)
|
|
}
|
|
}
|
|
|
|
if lastFileName == "" {
|
|
break
|
|
}
|
|
}
|
|
|
|
if storeDeletionErr := f.meta.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
|
|
return fmt.Errorf("delete folder children metadata: %v", storeDeletionErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (f *FileSystemManager) doDeleteEntryMetaAndData(ctx context.Context, entry *utils.Entry) error {
|
|
if storeDeletionErr := f.meta.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
|
|
return fmt.Errorf("delete entry metadata: %v", storeDeletionErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (f *FileSystemManager) ListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64) (entries []*utils.Entry, hasMore bool, err error) {
|
|
f.RLock()
|
|
defer f.RUnlock()
|
|
|
|
_, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, func(entry *utils.Entry) bool {
|
|
entries = append(entries, entry)
|
|
return true
|
|
})
|
|
|
|
hasMore = int64(len(entries)) >= limit+1
|
|
if hasMore {
|
|
entries = entries[:limit]
|
|
}
|
|
|
|
return entries, hasMore, err
|
|
}
|
|
|
|
func (f *FileSystemManager) StreamListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc store.ListEachEntryFunc) (lastFileName string, err error) {
|
|
|
|
lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, eachEntryFunc)
|
|
return
|
|
}
|
|
|
|
func (f *FileSystemManager) doListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc store.ListEachEntryFunc) (lastFileName string, err error) {
|
|
lastFileName, err = f.meta.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
return false
|
|
default:
|
|
return eachEntryFunc(entry)
|
|
}
|
|
})
|
|
return
|
|
}
|
|
|
|
func (f *FileSystemManager) CreateFile(ctx context.Context, path utils.FullPath, reader io.Reader, contentType string, fileSize int64) error {
|
|
f.Lock()
|
|
defer f.Unlock()
|
|
|
|
if string(path) == "/" {
|
|
return fmt.Errorf("cannot create file %s", path)
|
|
}
|
|
|
|
if entry, _ := f.FindEntry(ctx, path); entry != nil {
|
|
return fmt.Errorf("file %s already exists", path)
|
|
}
|
|
|
|
parentDir, _ := path.DirAndName()
|
|
if parentDir != "/" {
|
|
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
|
|
return fmt.Errorf("parent directory %s does not exist", parentDir)
|
|
}
|
|
}
|
|
|
|
output, err := f.storage.UploadFile(path.ToS3Key(), reader, contentType)
|
|
if err != nil {
|
|
return fmt.Errorf("upload s3 failed: %v", err)
|
|
}
|
|
|
|
if output.ETag == nil {
|
|
empty := ""
|
|
output.ETag = &empty
|
|
}
|
|
|
|
if output.VersionID == nil {
|
|
empty := ""
|
|
output.VersionID = &empty
|
|
}
|
|
|
|
entry := utils.NewFileEntry(path, path.ToS3Key(), uint64(fileSize), contentType, *output.ETag, *output.VersionID)
|
|
if err := f.meta.InsertEntry(ctx, entry); err != nil {
|
|
return fmt.Errorf("create file entry failed: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (f *FileSystemManager) DownloadFile(ctx context.Context, path utils.FullPath) (*utils.S3ReadSeeker, *utils.Entry, error) {
|
|
f.RLock()
|
|
defer f.RUnlock()
|
|
|
|
entry, err := f.FindEntry(ctx, path)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("find entry failed: %v", err)
|
|
}
|
|
|
|
if entry.IsDir {
|
|
return nil, nil, fmt.Errorf("cannot download directory")
|
|
}
|
|
|
|
downloader, err := f.storage.DownloadFile(entry.S3Key)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("read s3 object failed: %v", err)
|
|
}
|
|
|
|
return downloader, entry, nil
|
|
}
|
|
|
|
func (f *FileSystemManager) DeleteFile(ctx context.Context, path utils.FullPath, isDir bool) error {
|
|
f.Lock()
|
|
defer f.Unlock()
|
|
|
|
if string(path) == "/" {
|
|
return fmt.Errorf("cannot delete root")
|
|
}
|
|
|
|
entry, err := f.FindEntry(ctx, path)
|
|
if err != nil {
|
|
return fmt.Errorf("find entry failed: %v", err)
|
|
}
|
|
|
|
if !isDir {
|
|
if err := f.storage.DeleteObject(entry.S3Key); err != nil {
|
|
return fmt.Errorf("delete s3 object failed: %v", err)
|
|
}
|
|
}
|
|
|
|
if err := f.meta.DeleteEntry(ctx, path); err != nil {
|
|
return fmt.Errorf("delete entry failed: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (f *FileSystemManager) RenameFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error {
|
|
f.Lock()
|
|
defer f.Unlock()
|
|
|
|
if string(srcPath) == "/" || string(dstPath) == "/" {
|
|
return fmt.Errorf("cannot rename root")
|
|
}
|
|
|
|
srcEntry, err := f.FindEntry(ctx, srcPath)
|
|
if err != nil {
|
|
return fmt.Errorf("find src file failed: %v", err)
|
|
}
|
|
|
|
if dstEntry, _ := f.FindEntry(ctx, dstPath); dstEntry != nil {
|
|
return fmt.Errorf("dst file %s already exists", dstPath)
|
|
}
|
|
|
|
parentDir, _ := dstPath.DirAndName()
|
|
if parentDir != "/" {
|
|
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
|
|
return fmt.Errorf("parent directory %s does not exist", parentDir)
|
|
}
|
|
}
|
|
|
|
if isDir {
|
|
return fmt.Errorf("rename not file")
|
|
}
|
|
|
|
if err := utils.Move(f.root, string(srcPath), string(dstPath)); err != nil {
|
|
return err
|
|
}
|
|
|
|
newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID)
|
|
|
|
if err := f.meta.InsertEntry(ctx, newEntry); err != nil {
|
|
return fmt.Errorf("insert new entry failed: %v", err)
|
|
}
|
|
|
|
if err := f.meta.DeleteEntry(ctx, srcPath); err != nil {
|
|
return fmt.Errorf("delete source entry failed: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (f *FileSystemManager) CopyFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error {
|
|
f.Lock()
|
|
defer f.Unlock()
|
|
|
|
if string(srcPath) == "/" || string(dstPath) == "/" {
|
|
return fmt.Errorf("cannot copy root")
|
|
}
|
|
|
|
srcEntry, err := f.FindEntry(ctx, srcPath)
|
|
if err != nil {
|
|
return fmt.Errorf("find src file failed: %v", err)
|
|
}
|
|
|
|
if dstEntry, _ := f.FindEntry(ctx, dstPath); dstEntry != nil {
|
|
return fmt.Errorf("dst file %s already exists", dstPath)
|
|
}
|
|
|
|
parentDir, _ := dstPath.DirAndName()
|
|
if parentDir != "/" {
|
|
if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil {
|
|
return fmt.Errorf("parent directory %s does not exist", parentDir)
|
|
}
|
|
}
|
|
|
|
if isDir {
|
|
return fmt.Errorf("copy not file")
|
|
}
|
|
|
|
if err := utils.Clone(f.root, string(srcPath), string(dstPath)); err != nil {
|
|
return err
|
|
}
|
|
|
|
newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID)
|
|
if err := f.meta.InsertEntry(ctx, newEntry); err != nil {
|
|
return fmt.Errorf("insert new entry failed: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (f *FileSystemManager) Shutdown() {
|
|
if f.meta != nil {
|
|
f.meta.Shutdown()
|
|
}
|
|
}
|