diff --git a/api_directory.go b/api_directory.go index a7410b1..25253dc 100644 --- a/api_directory.go +++ b/api_directory.go @@ -36,11 +36,12 @@ func (s *Service) HandleMkdir( resp *httpserver.Response, ) *httpserver.Response { params := req.Binded.(*MkdirParams) - fullPath := utils.FullPath(params.Path) + path := params.Path + fullPath := utils.FullPath(path) err := s.FileSystemManager.MakeDirectory(req.Context(), fullPath) if err != nil { - logger.Error("makeDirectory %s: %s", string(fullPath), err.Error()) + logger.Error("makeDirectory %s: %v", path, err) return resp.InternalServerError("mkdir failed, " + err.Error()) } @@ -65,7 +66,7 @@ func (s *Service) HandleListDirectory( limit, ) if err != nil { - logger.Error("listDirectory %s %s %d: %s", path, startFileName, limit, err) + logger.Error("listDirectory %s %s %d: %v", path, startFileName, limit, err) return resp.InternalServerError("listDirectory failed, " + err.Error()) } diff --git a/api_file.go b/api_file.go index 0e76dbb..0680a44 100644 --- a/api_file.go +++ b/api_file.go @@ -2,7 +2,9 @@ package main import ( "gosvc/httpserver" + "gosvc/logger" "gosvc/validator" + "robotfs/utils" ) type DeleteParams struct { @@ -45,6 +47,21 @@ func (s *Service) HandleUploadFile( req *httpserver.Request, resp *httpserver.Response, ) *httpserver.Response { + path := req.QueryString("path") + newPath := utils.NormalizePath(path) + + file, fileHeader, err := req.FormFile("file") + if err != nil { + return resp.InternalServerError("get file failed, " + err.Error()) + } + defer file.Close() + + contentType := fileHeader.Header.Get("Content-Type") + if err := s.Engine.FileSystemManager.CreateFile(req.Context(), utils.FullPath(newPath), file, contentType); err != nil { + logger.Error("create %s: %v", path, err) + return resp.InternalServerError("create file failed, " + err.Error()) + } + return resp.NoContent() } diff --git a/engine/engine.go b/engine/engine.go index de036b0..2f042da 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -36,5 +36,6 @@ func (e *Engine) Start() error { } func (e *Engine) Stop() { + // Stop filesystem manager. e.FileSystemManager.Shutdown() } diff --git a/engine/filesystem_manager.go b/engine/filesystem_manager.go index eb1e5c9..f30288e 100644 --- a/engine/filesystem_manager.go +++ b/engine/filesystem_manager.go @@ -3,6 +3,8 @@ package engine import ( "context" "fmt" + "io" + "sync" "time" "robotfs/store" @@ -10,6 +12,7 @@ import ( ) type FileSystemManager struct { + sync.RWMutex meta store.MetaStore storage *StorageManager } @@ -43,8 +46,11 @@ var ( ) func (f *FileSystemManager) FindEntry(ctx context.Context, p utils.FullPath) (entry *utils.Entry, err error) { + f.RLock() + defer f.RUnlock() + if p == "/" { - return nil, nil + return Root, nil } entry, err = f.meta.FindEntry(ctx, p) @@ -56,6 +62,9 @@ func (f *FileSystemManager) FindEntry(ctx context.Context, p utils.FullPath) (en } func (f *FileSystemManager) MakeDirectory(ctx context.Context, path utils.FullPath) error { + f.Lock() + defer f.Unlock() + if string(path) == "/" { return nil } @@ -71,11 +80,13 @@ func (f *FileSystemManager) MakeDirectory(ctx context.Context, path utils.FullPa } } - entry := utils.MakeEntry(path, true) + entry := utils.NewDirEntry(path) return f.meta.InsertEntry(ctx, entry) } func (f *FileSystemManager) ListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64) (entries []*utils.Entry, hasMore bool, err error) { + f.RLock() + defer f.RUnlock() _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, func(entry *utils.Entry) bool { entries = append(entries, entry) @@ -108,10 +119,40 @@ func (f *FileSystemManager) doListDirectoryEntries(ctx context.Context, p utils. return } -func (f *FileSystemManager) Shutdown() error { - if f.meta != nil { - f.meta.Shutdown() +func (f *FileSystemManager) CreateFile(ctx context.Context, path utils.FullPath, reader io.Reader, contentType string) error { + f.Lock() + defer f.Unlock() + + if string(path) == "/" { + return fmt.Errorf("cannot create file %s", path) + } + + if entry, _ := f.FindEntry(ctx, path); entry != nil { + return fmt.Errorf("file %s already exists", path) + } + + parentDir, _ := path.DirAndName() + if parentDir != "/" { + if parentEntry, _ := f.FindEntry(ctx, utils.FullPath(parentDir)); parentEntry == nil { + return fmt.Errorf("parent directory %s does not exist", parentDir) + } + } + + output, size, err := f.storage.UploadFile(path.ToS3Key(), reader, contentType) + if err != nil { + return fmt.Errorf("upload s3 failed: %v", err) + } + + entry := utils.NewFileEntry(path, path.ToS3Key(), uint64(size), contentType, *output.ETag, *output.VersionID) + if err := f.meta.InsertEntry(ctx, entry); err != nil { + return fmt.Errorf("create file entry failed: %v", err) } return nil } + +func (f *FileSystemManager) Shutdown() { + if f.meta != nil { + f.meta.Shutdown() + } +} diff --git a/engine/storage_manager.go b/engine/storage_manager.go index 570bf2f..95df58b 100644 --- a/engine/storage_manager.go +++ b/engine/storage_manager.go @@ -86,7 +86,7 @@ func createS3Session(config *S3Config) (*S3Config, *session.Session, error) { return config, sess, nil } -func (sm *StorageManager) UploadData(s3Key string, data []byte, contentType string) (*s3manager.UploadOutput, error) { +func (sm *StorageManager) UploadData(s3Key string, data []byte, contentType string) (*s3manager.UploadOutput, int64, error) { if contentType == "" { contentType = "application/octet-stream" } @@ -98,28 +98,48 @@ func (sm *StorageManager) UploadData(s3Key string, data []byte, contentType stri ContentType: aws.String(contentType), }) if err != nil { - return nil, err + return nil, 0, err } - - return output, nil + size := len(data) + return output, int64(size), nil } -func (sm *StorageManager) UploadFile(s3Key string, data io.Reader, contentType string) (*s3manager.UploadOutput, error) { +type countReader struct { + reader io.Reader + size int64 +} + +func newCountReader(r io.Reader) *countReader { + return &countReader{ + reader: r, + size: 0, + } +} + +func (r *countReader) Read(p []byte) (n int, err error) { + n, err = r.reader.Read(p) + r.size += int64(n) + return n, err +} + +func (sm *StorageManager) UploadFile(s3Key string, data io.Reader, contentType string) (*s3manager.UploadOutput, int64, error) { if contentType == "" { contentType = "application/octet-stream" } + countReader := newCountReader(data) + output, err := sm.s3Uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String(sm.bucketName), Key: aws.String(s3Key), - Body: data, + Body: countReader, ContentType: aws.String(contentType), }) if err != nil { - return nil, err + return nil, 0, err } - return output, nil + return output, countReader.size, nil } func (sm *StorageManager) DownloadFile(S3Key string, localFilePath string) error { diff --git a/utils/entry.go b/utils/entry.go index 5f80372..aa0ad4e 100644 --- a/utils/entry.go +++ b/utils/entry.go @@ -22,15 +22,29 @@ type Entry struct { Extended map[string][]byte } -func MakeEntry(fullPath FullPath, isDirectory bool) *Entry { +func NewDirEntry(fullPath FullPath) *Entry { return &Entry{ FullPath: fullPath, - IsDir: isDirectory, + IsDir: true, CreateTime: time.Now().Unix(), LastModificationTime: time.Now().Unix(), } } +func NewFileEntry(fullPath FullPath, s3Key string, size uint64, contentType string, etag string, versionID string) *Entry { + return &Entry{ + FullPath: fullPath, + IsDir: false, + Size: size, + CreateTime: time.Now().Unix(), + LastModificationTime: time.Now().Unix(), + S3Key: s3Key, + ContentType: contentType, + Etag: etag, + VersionID: versionID, + } +} + func (entry *Entry) Encode() ([]byte, error) { message := entry.ToProto() return proto.Marshal(message) diff --git a/utils/fullPath.go b/utils/fullPath.go index cf56a2b..c532e56 100644 --- a/utils/fullPath.go +++ b/utils/fullPath.go @@ -64,6 +64,10 @@ func (fp FullPath) IsUnder(other FullPath) bool { return strings.HasPrefix(string(fp), string(other)+"/") } +func (fp FullPath) ToS3Key() string { + return strings.TrimPrefix(string(fp), "/") +} + func StringSplit(separatedValues string, sep string) []string { if separatedValues == "" { return nil diff --git a/utils/s3.go b/utils/s3.go deleted file mode 100644 index 5b5077f..0000000 --- a/utils/s3.go +++ /dev/null @@ -1,14 +0,0 @@ -package utils - -import ( - "strings" -) - -func PathToS3Key(path string) string { - key := strings.TrimPrefix(path, "/") - if key == "" { - key = "root" - } - - return key -}