Update: update lock
This commit is contained in:
parent
d3855aee27
commit
838f194a51
@ -11,8 +11,8 @@ import (
|
||||
)
|
||||
|
||||
func (f *FileSystemManager) CopyDirectory(ctx context.Context, oldPath, newPath utils.FullPath, isDir bool) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
locker := f.locker.AcquireOrderedLock(string(oldPath), string(newPath))
|
||||
defer f.locker.ReleaseLocks(locker)
|
||||
|
||||
if string(oldPath) == "/" {
|
||||
return fmt.Errorf("cannot copy root directory")
|
||||
|
||||
@ -9,8 +9,8 @@ import (
|
||||
)
|
||||
|
||||
func (f *FileSystemManager) DeleteDirectory(ctx context.Context, path utils.FullPath, isRecursive, ignoreRecursiveError bool) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
locker := f.locker.AcquireLock(string(path))
|
||||
defer f.locker.ReleaseLock(locker)
|
||||
|
||||
if string(path) == "/" {
|
||||
return nil
|
||||
|
||||
@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"gosvc/logger"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"robotfs/pkg/meta"
|
||||
@ -14,17 +13,18 @@ import (
|
||||
)
|
||||
|
||||
type FileSystemManager struct {
|
||||
sync.RWMutex
|
||||
root string
|
||||
meta meta.MetaStore
|
||||
store *storage.Storage
|
||||
root string
|
||||
meta meta.MetaStore
|
||||
store *storage.Storage
|
||||
locker *utils.LockManager
|
||||
}
|
||||
|
||||
func NewFileSystemManager(meta meta.MetaStore, store *storage.Storage, root string) *FileSystemManager {
|
||||
return &FileSystemManager{
|
||||
root: root,
|
||||
meta: meta,
|
||||
store: store,
|
||||
root: root,
|
||||
meta: meta,
|
||||
store: store,
|
||||
locker: utils.NewLockManager(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -63,8 +63,8 @@ func (f *FileSystemManager) FindEntry(ctx context.Context, p utils.FullPath) (en
|
||||
}
|
||||
|
||||
func (f *FileSystemManager) MakeDirectory(ctx context.Context, path utils.FullPath) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
locker := f.locker.AcquireLock(string(path))
|
||||
defer f.locker.ReleaseLock(locker)
|
||||
|
||||
logger.Info("making directory => %s", path)
|
||||
|
||||
@ -92,8 +92,8 @@ func (f *FileSystemManager) MakeDirectory(ctx context.Context, path utils.FullPa
|
||||
}
|
||||
|
||||
func (f *FileSystemManager) ListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64) (entries []*utils.Entry, hasMore bool, lastFileName string, err error) {
|
||||
f.RLock()
|
||||
defer f.RUnlock()
|
||||
locker := f.locker.AcquireRLock(string(p))
|
||||
defer f.locker.ReleaseLock(locker)
|
||||
|
||||
logger.Info("listing directory %s => startFileName:%s inclusive:%t count:%d", p, startFileName, inclusive, limit)
|
||||
|
||||
@ -129,8 +129,8 @@ func (f *FileSystemManager) doListDirectoryEntries(ctx context.Context, p utils.
|
||||
}
|
||||
|
||||
func (f *FileSystemManager) CreateFile(ctx context.Context, path utils.FullPath, reader io.Reader, contentType string, fileSize int64) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
locker := f.locker.AcquireLock(string(path))
|
||||
defer f.locker.ReleaseLock(locker)
|
||||
|
||||
logger.Info("uploading file => %s", path)
|
||||
|
||||
@ -173,8 +173,8 @@ func (f *FileSystemManager) CreateFile(ctx context.Context, path utils.FullPath,
|
||||
}
|
||||
|
||||
func (f *FileSystemManager) DownloadFile(ctx context.Context, path utils.FullPath) (*utils.S3ReadSeeker, *utils.Entry, error) {
|
||||
f.RLock()
|
||||
defer f.RUnlock()
|
||||
locker := f.locker.AcquireRLock(string(path))
|
||||
defer f.locker.ReleaseLock(locker)
|
||||
|
||||
logger.Info("downloading file => %s", path)
|
||||
|
||||
@ -196,8 +196,8 @@ func (f *FileSystemManager) DownloadFile(ctx context.Context, path utils.FullPat
|
||||
}
|
||||
|
||||
func (f *FileSystemManager) DeleteFile(ctx context.Context, path utils.FullPath, isDir bool) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
locker := f.locker.AcquireLock(string(path))
|
||||
defer f.locker.ReleaseLock(locker)
|
||||
|
||||
logger.Info("deleting file => %s", path)
|
||||
|
||||
@ -224,8 +224,8 @@ func (f *FileSystemManager) DeleteFile(ctx context.Context, path utils.FullPath,
|
||||
}
|
||||
|
||||
func (f *FileSystemManager) RenameFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
locker := f.locker.AcquireOrderedLock(string(srcPath), string(dstPath))
|
||||
defer f.locker.ReleaseLocks(locker)
|
||||
|
||||
logger.Info("moving file %s => %s", srcPath, dstPath)
|
||||
|
||||
@ -258,7 +258,6 @@ func (f *FileSystemManager) RenameFile(ctx context.Context, srcPath, dstPath uti
|
||||
}
|
||||
|
||||
newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID)
|
||||
|
||||
if err := f.meta.UpdateEntry(ctx, newEntry); err != nil {
|
||||
return fmt.Errorf("update entry failed: %v", err)
|
||||
}
|
||||
@ -271,8 +270,8 @@ func (f *FileSystemManager) RenameFile(ctx context.Context, srcPath, dstPath uti
|
||||
}
|
||||
|
||||
func (f *FileSystemManager) CopyFile(ctx context.Context, srcPath, dstPath utils.FullPath, isDir bool) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
locker := f.locker.AcquireOrderedLock(string(srcPath), string(dstPath))
|
||||
defer f.locker.ReleaseLocks(locker)
|
||||
|
||||
logger.Info("copying file %s => %s", srcPath, dstPath)
|
||||
|
||||
|
||||
@ -11,8 +11,8 @@ import (
|
||||
)
|
||||
|
||||
func (f *FileSystemManager) RenameDirectory(ctx context.Context, oldPath, newPath utils.FullPath, isDir bool) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
locker := f.locker.AcquireOrderedLock(string(oldPath), string(newPath))
|
||||
defer f.locker.ReleaseLocks(locker)
|
||||
|
||||
if err := f.canRename(oldPath, newPath, isDir); err != nil {
|
||||
return err
|
||||
|
||||
@ -324,6 +324,6 @@ main() {
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
}
|
||||
}
|
||||
|
||||
main "$@"
|
||||
main "$@"
|
||||
|
||||
114
utils/lockManager.go
Normal file
114
utils/lockManager.go
Normal file
@ -0,0 +1,114 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const shardCount = 256
|
||||
|
||||
type LockManager struct {
|
||||
shards [shardCount]sync.RWMutex
|
||||
}
|
||||
|
||||
type LockType int
|
||||
|
||||
const (
|
||||
ReadLock LockType = iota
|
||||
WriteLock
|
||||
)
|
||||
|
||||
type Lock struct {
|
||||
Path string
|
||||
shard uint32
|
||||
lockType LockType
|
||||
}
|
||||
|
||||
func NewLockManager() *LockManager {
|
||||
return &LockManager{}
|
||||
}
|
||||
|
||||
func hashPath(path string) uint32 {
|
||||
h := fnv.New32a()
|
||||
h.Write([]byte(path))
|
||||
return h.Sum32() % shardCount
|
||||
}
|
||||
|
||||
func (lm *LockManager) AcquireRLock(path string) *Lock {
|
||||
shard := hashPath(path)
|
||||
lm.shards[shard].RLock()
|
||||
|
||||
return &Lock{
|
||||
Path: path,
|
||||
shard: shard,
|
||||
lockType: ReadLock,
|
||||
}
|
||||
}
|
||||
|
||||
func (lm *LockManager) AcquireLock(path string) *Lock {
|
||||
shard := hashPath(path)
|
||||
lm.shards[shard].Lock()
|
||||
|
||||
return &Lock{
|
||||
Path: path,
|
||||
shard: shard,
|
||||
lockType: WriteLock,
|
||||
}
|
||||
}
|
||||
|
||||
func (lm *LockManager) ReleaseLock(lock *Lock) error {
|
||||
if lock == nil {
|
||||
return fmt.Errorf("lock is nil")
|
||||
}
|
||||
if lock.shard >= shardCount {
|
||||
return fmt.Errorf("invalid shard index")
|
||||
}
|
||||
|
||||
switch lock.lockType {
|
||||
case ReadLock:
|
||||
lm.shards[lock.shard].RUnlock()
|
||||
case WriteLock:
|
||||
lm.shards[lock.shard].Unlock()
|
||||
default:
|
||||
return fmt.Errorf("unknown lock type")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lm *LockManager) AcquireOrderedLock(paths ...string) []*Lock {
|
||||
sortedPaths := make([]string, len(paths))
|
||||
copy(sortedPaths, paths)
|
||||
sort.Strings(sortedPaths)
|
||||
|
||||
locks := make([]*Lock, 0, len(sortedPaths))
|
||||
for _, path := range sortedPaths {
|
||||
lock := lm.AcquireLock(path)
|
||||
locks = append(locks, lock)
|
||||
}
|
||||
return locks
|
||||
}
|
||||
|
||||
func (lm *LockManager) AcquireOrderedRLock(paths ...string) []*Lock {
|
||||
sortedPaths := make([]string, len(paths))
|
||||
copy(sortedPaths, paths)
|
||||
sort.Strings(sortedPaths)
|
||||
|
||||
locks := make([]*Lock, 0, len(sortedPaths))
|
||||
for _, path := range sortedPaths {
|
||||
lock := lm.AcquireRLock(path)
|
||||
locks = append(locks, lock)
|
||||
}
|
||||
return locks
|
||||
}
|
||||
|
||||
func (lm *LockManager) ReleaseLocks(locks []*Lock) error {
|
||||
for i := len(locks) - 1; i >= 0; i-- {
|
||||
if err := lm.ReleaseLock(locks[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user