Update: adjust the project structure

This commit is contained in:
dukai 2025-05-27 18:58:32 +08:00
parent d07dc19f31
commit 5b95256929
21 changed files with 654 additions and 249 deletions

View File

@ -1,6 +1,7 @@
robotfs:
address: "0.0.0.0:6201"
log_path: "/var/log/robotfs"
data_path: "/data/dukai/juiceFSdata"
redis:
address: "180.76.110.177:6379"
@ -16,6 +17,3 @@ s3:
secret_access_key: "12345678"
bucket_name: "myjfs"
use_path_style: true
juicefs:
data_path: "/data/juicefs"

View File

@ -1,7 +1,6 @@
package config
import (
"robotfs/utils"
"time"
"gosvc"
@ -10,9 +9,13 @@ import (
)
var (
cfg = utils.GetViper()
cfg = getViper()
)
func GetConfig() Configuration {
return cfg
}
func SetupServiceConfig() {
gosvc.Config = gosvc.ServiceConfig{
Name: "robotfs",

View File

@ -1,4 +1,4 @@
package utils
package config
import (
"path/filepath"
@ -90,7 +90,7 @@ func (vp *ViperProxy) GetStringSlice(key string) []string {
return vp.Viper.GetStringSlice(key)
}
func GetViper() *ViperProxy {
func getViper() *ViperProxy {
vp.Lock()
defer vp.Unlock()

View File

@ -1,41 +0,0 @@
package config
type RobotFSConfig struct {
Address string `mapstructure:"address"`
LogPath string `mapstructure:"log_path"`
}
type RedisConfig struct {
Address string `mapstructure:"address"`
Password string `mapstructure:"password"`
DataBase int `mapstructure:"database"`
//Todo: use it in the future
SuperLargeDirectories []string `mapstructure:"superLargeDirectories"`
}
type S3Config struct {
Region string `mapstructure:"region"`
Endpoint string `mapstructure:"endpoint"`
AccessKeyID string `mapstructure:"access_key_id"`
SecretAccessKey string `mapstructure:"secret_access_key"`
BucketName string `mapstructure:"bucket_name"`
UsePathStyle bool `mapstructure:"use_path_style"`
}
type JuicefsConfig struct {
//Address string `mapstructure:"address"`
DataPath string `mapstructure:"data_path"`
}
type Config struct {
RobotFS RobotFSConfig `mapstructure:"robotfs"`
Redis RedisConfig `mapstructure:"redis"`
S3 S3Config `mapstructure:"s3"`
Juicefs JuicefsConfig `mapstructure:"juicefs"`
}
func GetConfig() *Config {
config := &Config{}
return config
}

View File

@ -3,8 +3,9 @@ package engine
import (
"fmt"
"robotfs/store/redis_lua"
"robotfs/utils"
"robotfs/config"
"robotfs/pkg/meta/redis_lua"
"robotfs/pkg/storage"
)
type Engine struct {
@ -16,25 +17,45 @@ func NewEngine() *Engine {
}
func (e *Engine) Start() error {
s3Config, err := Initialize(utils.GetViper(), "s3.")
cfg := config.GetConfig()
meta, err := e.initMetaStore(cfg)
if err != nil {
return fmt.Errorf("failed to initialize s3 config: %v", err)
}
object, err := NewStorageManager(s3Config)
if err != nil {
return fmt.Errorf("failed to create storage manager: %v", err)
return err
}
meta := redis_lua.NewRedisLuaStore()
if err := meta.Initialize(utils.GetViper(), "redis."); err != nil {
return fmt.Errorf("failed to initialize meta store: %v", err)
store, err := e.initStorage(cfg)
if err != nil {
return err
}
filesystem := NewFileSystemManager(meta, object, utils.GetViper().GetString("robotfs.data_path"))
e.FileSystemManager = filesystem
e.FileSystemManager = NewFileSystemManager(
meta,
store,
cfg.GetString("robotfs.data_path"),
)
return nil
}
func (e *Engine) initMetaStore(cfg config.Configuration) (*redis_lua.RedisLuaStore, error) {
meta := redis_lua.NewRedisLuaStore()
if err := meta.Initialize(cfg, "redis."); err != nil {
return nil, fmt.Errorf("initialize meta store failed: %v", err)
}
return meta, nil
}
func (e *Engine) initStorage(cfg config.Configuration) (*storage.Storage, error) {
store := storage.NewStorage()
if err := store.Initialize(cfg, "s3."); err != nil {
return nil, fmt.Errorf("initialize s3 config failed: %v", err)
}
return store, nil
}
func (e *Engine) Stop() {
// Stop filesystem manager.
e.FileSystemManager.Shutdown()

View File

@ -113,7 +113,7 @@ func (f *FileSystemManager) copySelfEntry(ctx context.Context, oldPath utils.Ful
IsDir: entry.IsDir,
Size: entry.Size,
CreateTime: time.Now().Unix(),
S3Key: entry.S3Key,
S3Key: newPath.ToS3Key(),
ContentType: entry.ContentType,
Etag: entry.Etag,
VersionID: entry.VersionID,

108
engine/filesystem_delete.go Normal file
View File

@ -0,0 +1,108 @@
package engine
import (
"context"
"fmt"
"gosvc/logger"
"robotfs/utils"
)
func (f *FileSystemManager) DeleteDirectory(ctx context.Context, path utils.FullPath, isRecursive, ignoreRecursiveError bool) error {
f.Lock()
defer f.Unlock()
if string(path) == "/" {
return nil
}
entry, err := f.FindEntry(ctx, path)
if err != nil {
return err
}
if entry.IsDir {
err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError)
if err != nil {
return err
}
}
// Delete the directory entry itself
err = f.doDeleteEntryMetaAndData(ctx, entry)
if err != nil {
return err
}
if err := utils.Remove(f.root, string(path), isRecursive); err != nil {
return err
}
return nil
}
func (f *FileSystemManager) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *utils.Entry, isRecursive, ignoreRecursiveError bool) error {
logger.Info("deleting folder => %s", entry.FullPath)
lastFileName := ""
includeLastFile := false
for {
lastFileName, err := f.doListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, 1000, func(sub *utils.Entry) bool {
if sub.IsDir {
// Recursively delete subdirectory
err := f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError)
if err == nil {
err = f.doDeleteEntryMetaAndData(ctx, sub)
}
if err != nil && !ignoreRecursiveError {
return false
}
} else {
// Delete file entry
err := f.doDeleteEntryMetaAndData(ctx, sub)
if err != nil && !ignoreRecursiveError {
return false
}
}
return true
})
if err != nil {
return fmt.Errorf("list folder %s: %v", entry.FullPath, err)
}
if lastFileName == "" && !isRecursive {
// Check if there are any entries
hasEntries := false
_, err := f.doListDirectoryEntries(ctx, entry.FullPath, "", false, 1, func(_ *utils.Entry) bool {
hasEntries = true
return false
})
if err != nil {
return fmt.Errorf("check directory empty: %v", err)
}
if hasEntries {
return fmt.Errorf("directory not empty: %s", entry.FullPath)
}
}
if lastFileName == "" {
break
}
}
if storeDeletionErr := f.meta.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
return fmt.Errorf("delete folder children metadata: %v", storeDeletionErr)
}
return nil
}
func (f *FileSystemManager) doDeleteEntryMetaAndData(ctx context.Context, entry *utils.Entry) error {
logger.Info("deleting entry => %s", entry.FullPath)
if storeDeletionErr := f.meta.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
return fmt.Errorf("delete entry metadata: %v", storeDeletionErr)
}
return nil
}

View File

@ -7,22 +7,23 @@ import (
"sync"
"time"
"robotfs/store"
"robotfs/pkg/meta"
"robotfs/pkg/storage"
"robotfs/utils"
)
type FileSystemManager struct {
sync.RWMutex
meta store.MetaStore
storage *StorageManager
root string
root string
meta meta.MetaStore
store *storage.Storage
}
func NewFileSystemManager(meta store.MetaStore, storage *StorageManager, root string) *FileSystemManager {
func NewFileSystemManager(meta meta.MetaStore, store *storage.Storage, root string) *FileSystemManager {
return &FileSystemManager{
meta: meta,
storage: storage,
root: root,
root: root,
meta: meta,
store: store,
}
}
@ -79,106 +80,14 @@ func (f *FileSystemManager) MakeDirectory(ctx context.Context, path utils.FullPa
}
}
if err := utils.Mkdir(f.root, string(path)); err != nil {
return err
}
entry := utils.NewDirEntry(path)
return f.meta.InsertEntry(ctx, entry)
}
func (f *FileSystemManager) DeleteDirectory(ctx context.Context, path utils.FullPath, isRecursive, ignoreRecursiveError bool) error {
f.Lock()
defer f.Unlock()
if string(path) == "/" {
return nil
}
entry, err := f.FindEntry(ctx, path)
if err != nil {
return err
}
if entry.IsDir {
err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError)
if err != nil {
return err
}
}
// Delete the directory entry itself
err = f.doDeleteEntryMetaAndData(ctx, entry)
if err != nil {
return err
}
if err := utils.Remove(f.root, string(path), isRecursive); err != nil {
return err
}
return nil
}
func (f *FileSystemManager) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *utils.Entry, isRecursive, ignoreRecursiveError bool) error {
lastFileName := ""
includeLastFile := false
for {
lastFileName, err := f.doListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, 1000, func(sub *utils.Entry) bool {
if sub.IsDir {
// Recursively delete subdirectory
err := f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError)
if err == nil {
err = f.doDeleteEntryMetaAndData(ctx, sub)
}
if err != nil && !ignoreRecursiveError {
return false
}
} else {
// Delete file entry
err := f.doDeleteEntryMetaAndData(ctx, sub)
if err != nil && !ignoreRecursiveError {
return false
}
}
return true
})
if err != nil {
return fmt.Errorf("list folder %s: %v", entry.FullPath, err)
}
if lastFileName == "" && !isRecursive {
// Check if there are any entries
hasEntries := false
_, err := f.doListDirectoryEntries(ctx, entry.FullPath, "", false, 1, func(_ *utils.Entry) bool {
hasEntries = true
return false
})
if err != nil {
return fmt.Errorf("check directory empty: %v", err)
}
if hasEntries {
return fmt.Errorf("directory not empty: %s", entry.FullPath)
}
}
if lastFileName == "" {
break
}
}
if storeDeletionErr := f.meta.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
return fmt.Errorf("delete folder children metadata: %v", storeDeletionErr)
}
return nil
}
func (f *FileSystemManager) doDeleteEntryMetaAndData(ctx context.Context, entry *utils.Entry) error {
if storeDeletionErr := f.meta.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
return fmt.Errorf("delete entry metadata: %v", storeDeletionErr)
}
return nil
}
func (f *FileSystemManager) ListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64) (entries []*utils.Entry, hasMore bool, err error) {
f.RLock()
defer f.RUnlock()
@ -196,13 +105,13 @@ func (f *FileSystemManager) ListDirectoryEntries(ctx context.Context, p utils.Fu
return entries, hasMore, err
}
func (f *FileSystemManager) StreamListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc store.ListEachEntryFunc) (lastFileName string, err error) {
func (f *FileSystemManager) StreamListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) {
lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, eachEntryFunc)
return
}
func (f *FileSystemManager) doListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc store.ListEachEntryFunc) (lastFileName string, err error) {
func (f *FileSystemManager) doListDirectoryEntries(ctx context.Context, p utils.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) {
lastFileName, err = f.meta.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit, func(entry *utils.Entry) bool {
select {
case <-ctx.Done():
@ -233,9 +142,9 @@ func (f *FileSystemManager) CreateFile(ctx context.Context, path utils.FullPath,
}
}
output, err := f.storage.UploadFile(path.ToS3Key(), reader, contentType)
output, err := f.store.UploadFile(path.ToS3Key(), reader, contentType)
if err != nil {
return fmt.Errorf("upload s3 failed: %v", err)
return fmt.Errorf("upload file failed: %v", err)
}
if output.ETag == nil {
@ -269,9 +178,9 @@ func (f *FileSystemManager) DownloadFile(ctx context.Context, path utils.FullPat
return nil, nil, fmt.Errorf("cannot download directory")
}
downloader, err := f.storage.DownloadFile(entry.S3Key)
downloader, err := f.store.DownloadFile(entry.S3Key)
if err != nil {
return nil, nil, fmt.Errorf("read s3 object failed: %v", err)
return nil, nil, fmt.Errorf("download file failed: %v", err)
}
return downloader, entry, nil
@ -291,8 +200,8 @@ func (f *FileSystemManager) DeleteFile(ctx context.Context, path utils.FullPath,
}
if !isDir {
if err := f.storage.DeleteObject(entry.S3Key); err != nil {
return fmt.Errorf("delete s3 object failed: %v", err)
if err := f.store.DeleteObject(entry.S3Key); err != nil {
return fmt.Errorf("delete file failed: %v", err)
}
}
@ -313,11 +222,11 @@ func (f *FileSystemManager) RenameFile(ctx context.Context, srcPath, dstPath uti
srcEntry, err := f.FindEntry(ctx, srcPath)
if err != nil {
return fmt.Errorf("find src file failed: %v", err)
return fmt.Errorf("find src entry failed: %v", err)
}
if dstEntry, _ := f.FindEntry(ctx, dstPath); dstEntry != nil {
return fmt.Errorf("dst file %s already exists", dstPath)
return fmt.Errorf("dst entry %s already exists", dstPath)
}
parentDir, _ := dstPath.DirAndName()
@ -337,12 +246,12 @@ func (f *FileSystemManager) RenameFile(ctx context.Context, srcPath, dstPath uti
newEntry := utils.NewFileEntry(dstPath, dstPath.ToS3Key(), srcEntry.Size, srcEntry.ContentType, srcEntry.Etag, srcEntry.VersionID)
if err := f.meta.InsertEntry(ctx, newEntry); err != nil {
return fmt.Errorf("insert new entry failed: %v", err)
if err := f.meta.UpdateEntry(ctx, newEntry); err != nil {
return fmt.Errorf("update entry failed: %v", err)
}
if err := f.meta.DeleteEntry(ctx, srcPath); err != nil {
return fmt.Errorf("delete source entry failed: %v", err)
return fmt.Errorf("delete src entry failed: %v", err)
}
return nil
@ -358,11 +267,11 @@ func (f *FileSystemManager) CopyFile(ctx context.Context, srcPath, dstPath utils
srcEntry, err := f.FindEntry(ctx, srcPath)
if err != nil {
return fmt.Errorf("find src file failed: %v", err)
return fmt.Errorf("find src entry failed: %v", err)
}
if dstEntry, err := f.FindEntry(ctx, dstPath); dstEntry != nil && err == nil {
return fmt.Errorf("dst file %s already exists", dstPath)
return fmt.Errorf("dst entry %s already exists", dstPath)
}
parentDir, _ := dstPath.DirAndName()

View File

@ -111,7 +111,7 @@ func (f *FileSystemManager) moveSelfEntry(ctx context.Context, oldPath utils.Ful
IsDir: entry.IsDir,
Size: entry.Size,
CreateTime: time.Now().Unix(),
S3Key: entry.S3Key,
S3Key: newPath.ToS3Key(),
ContentType: entry.ContentType,
Etag: entry.Etag,
VersionID: entry.VersionID,
@ -119,8 +119,8 @@ func (f *FileSystemManager) moveSelfEntry(ctx context.Context, oldPath utils.Ful
Extended: entry.Extended,
}
if createErr := f.meta.InsertEntry(ctx, newEntry); createErr != nil {
return createErr
if updateErr := f.meta.UpdateEntry(ctx, newEntry); updateErr != nil {
return updateErr
}
if moveFolderSubEntries != nil {

View File

@ -7,7 +7,6 @@ import (
"os"
"robotfs/config"
"robotfs/utils"
)
const usage = `Usage: robotfs [options]
@ -44,7 +43,7 @@ func main() {
}
if *configFile != "" {
err := utils.LoadConfiguration(*configFile)
err := config.LoadConfiguration(*configFile)
if err != nil {
fmt.Println("load config file failed, " + err.Error())
os.Exit(1)

38
pkg/meta/meta_store.go Normal file
View File

@ -0,0 +1,38 @@
package meta
import (
"context"
"errors"
"robotfs/config"
"robotfs/utils"
)
var (
ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
ErrKvNotImplemented = errors.New("kv not implemented yet")
ErrKvNotFound = errors.New("kv: not found")
)
type ListEachEntryFunc func(entry *utils.Entry) bool
type MetaStore interface {
GetName() string
Initialize(configuration config.Configuration, prefix string) error
InsertEntry(context.Context, *utils.Entry) error
UpdateEntry(context.Context, *utils.Entry) (err error)
FindEntry(context.Context, utils.FullPath) (entry *utils.Entry, err error)
DeleteEntry(context.Context, utils.FullPath) (err error)
DeleteFolderChildren(context.Context, utils.FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
// Todo: implement this in the future
ListDirectoryPrefixedEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
BeginTransaction(ctx context.Context) (context.Context, error)
CommitTransaction(ctx context.Context) error
RollbackTransaction(ctx context.Context) error
Shutdown()
}

View File

@ -0,0 +1,40 @@
package redis_lua
import (
"robotfs/config"
"github.com/redis/go-redis/v9"
)
type RedisLuaStore struct {
UniversalRedisLuaStore
}
func NewRedisLuaStore() *RedisLuaStore {
return &RedisLuaStore{}
}
func (store *RedisLuaStore) GetName() string {
return "redis_lua"
}
func (store *RedisLuaStore) Initialize(configuration config.Configuration, prefix string) (err error) {
return store.initialize(
configuration.GetString(prefix+"address"),
configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"),
configuration.GetInt(prefix+"database"),
configuration.GetStringSlice(prefix+"superLargeDirectories"),
)
}
func (store *RedisLuaStore) initialize(hostPort string, username, password string, database int, superLargeDirectories []string) (err error) {
store.Client = redis.NewClient(&redis.Options{
Addr: hostPort,
Username: username,
Password: password,
DB: database,
})
store.loadSuperLargeDirectories(superLargeDirectories)
return
}

View File

@ -0,0 +1,19 @@
-- KEYS[1]: full path of entry
local fullpath = KEYS[1]
-- KEYS[2]: full path of entry
local fullpath_list_key = KEYS[2]
-- KEYS[3]: dir of the entry
local dir_list_key = KEYS[3]
-- ARGV[1]: isSuperLargeDirectory
local isSuperLargeDirectory = ARGV[1] == "1"
-- ARGV[2]: name of the entry
local name = ARGV[2]
redis.call("DEL", fullpath, fullpath_list_key)
if not isSuperLargeDirectory and name ~= "" then
redis.call("ZREM", dir_list_key, name)
end
return 0

View File

@ -0,0 +1,15 @@
-- KEYS[1]: full path of entry
local fullpath = KEYS[1]
if fullpath ~= "" and string.sub(fullpath, -1) == "/" then
fullpath = string.sub(fullpath, 0, -2)
end
local files = redis.call("ZRANGE", fullpath .. "\0", "0", "-1")
for _, name in ipairs(files) do
local file_path = fullpath .. "/" .. name
redis.call("DEL", file_path, file_path .. "\0")
end
return 0

View File

@ -0,0 +1,24 @@
package stored_procedure
import (
_ "embed"
"github.com/redis/go-redis/v9"
)
func init() {
InsertEntryScript = redis.NewScript(insertEntry)
DeleteEntryScript = redis.NewScript(deleteEntry)
DeleteFolderChildrenScript = redis.NewScript(deleteFolderChildren)
}
//go:embed insert_entry.lua
var insertEntry string
var InsertEntryScript *redis.Script
//go:embed delete_entry.lua
var deleteEntry string
var DeleteEntryScript *redis.Script
//go:embed delete_folder_children.lua
var deleteFolderChildren string
var DeleteFolderChildrenScript *redis.Script

View File

@ -0,0 +1,27 @@
-- KEYS[1]: full path of entry
local full_path = KEYS[1]
-- KEYS[2]: dir of the entry
local dir_list_key = KEYS[2]
-- ARGV[1]: content of the entry
local entry = ARGV[1]
-- ARGV[2]: TTL of the entry
local ttlSec = tonumber(ARGV[2])
-- ARGV[3]: isSuperLargeDirectory
local isSuperLargeDirectory = ARGV[3] == "1"
-- ARGV[4]: zscore of the entry in zset
local zscore = tonumber(ARGV[4])
-- ARGV[5]: name of the entry
local name = ARGV[5]
if ttlSec > 0 then
redis.call("SET", full_path, entry, "EX", ttlSec)
else
redis.call("SET", full_path, entry)
end
if not isSuperLargeDirectory and name ~= "" then
redis.call("ZADD", dir_list_key, "NX", zscore, name)
end
return 0

View File

@ -0,0 +1,180 @@
package redis_lua
import (
"context"
"errors"
"fmt"
"robotfs/pkg/meta"
"robotfs/pkg/meta/redis_lua/stored_procedure"
"robotfs/utils"
"github.com/redis/go-redis/v9"
)
const (
DIR_LIST_MARKER = "\x00"
)
var ErrNotFound = errors.New("no entry is found")
type UniversalRedisLuaStore struct {
Client redis.UniversalClient
superLargeDirectoryHash map[string]bool
}
func (store *UniversalRedisLuaStore) isSuperLargeDirectory(dir string) (isSuperLargeDirectory bool) {
_, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
return
}
func (store *UniversalRedisLuaStore) loadSuperLargeDirectories(superLargeDirectories []string) {
store.superLargeDirectoryHash = make(map[string]bool)
for _, dir := range superLargeDirectories {
store.superLargeDirectoryHash[dir] = true
}
}
func (store *UniversalRedisLuaStore) BeginTransaction(ctx context.Context) (context.Context, error) {
return ctx, nil
}
func (store *UniversalRedisLuaStore) CommitTransaction(ctx context.Context) error {
return nil
}
func (store *UniversalRedisLuaStore) RollbackTransaction(ctx context.Context) error {
return nil
}
func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *utils.Entry) (err error) {
value, err := entry.Encode()
if err != nil {
return fmt.Errorf("encoding %s: %v", entry.FullPath, err)
}
dir, name := entry.FullPath.DirAndName()
err = stored_procedure.InsertEntryScript.Run(ctx, store.Client,
[]string{string(entry.FullPath), genDirectoryListKey(dir)},
value, 0,
store.isSuperLargeDirectory(dir), 0, name).Err()
if err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
return nil
}
func (store *UniversalRedisLuaStore) UpdateEntry(ctx context.Context, entry *utils.Entry) (err error) {
return store.InsertEntry(ctx, entry)
}
func (store *UniversalRedisLuaStore) FindEntry(ctx context.Context, fullpath utils.FullPath) (entry *utils.Entry, err error) {
data, err := store.Client.Get(ctx, string(fullpath)).Result()
if err == redis.Nil {
return nil, ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
entry = &utils.Entry{
FullPath: fullpath,
}
err = entry.Decode([]byte(data))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
return entry, nil
}
func (store *UniversalRedisLuaStore) DeleteEntry(ctx context.Context, fullpath utils.FullPath) (err error) {
dir, name := fullpath.DirAndName()
err = stored_procedure.DeleteEntryScript.Run(ctx, store.Client,
[]string{string(fullpath), genDirectoryListKey(string(fullpath)), genDirectoryListKey(dir)},
store.isSuperLargeDirectory(dir), name).Err()
if err != nil {
return fmt.Errorf("DeleteEntry %s : %v", fullpath, err)
}
return nil
}
func (store *UniversalRedisLuaStore) DeleteFolderChildren(ctx context.Context, fullpath utils.FullPath) (err error) {
if store.isSuperLargeDirectory(string(fullpath)) {
return nil
}
err = stored_procedure.DeleteFolderChildrenScript.Run(ctx, store.Client,
[]string{string(fullpath)}).Err()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
}
return nil
}
// Todo: implement this in the future
func (store *UniversalRedisLuaStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, meta.ErrUnsupportedListDirectoryPrefixed
}
func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc meta.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath))
min := "-"
if startFileName != "" {
if includeStartFile {
min = "[" + startFileName
} else {
min = "(" + startFileName
}
}
members, err := store.Client.ZRangeByLex(ctx, dirListKey, &redis.ZRangeBy{
Min: min,
Max: "+",
Offset: 0,
Count: limit,
}).Result()
if err != nil {
return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
// fetch entry meta
for _, fileName := range members {
path := utils.NewFullPath(string(dirPath), fileName)
entry, err := store.FindEntry(ctx, path)
lastFileName = fileName
if err != nil {
if err == ErrNotFound {
continue
}
} else {
if !eachEntryFunc(entry) {
break
}
}
}
return lastFileName, err
}
func genDirectoryListKey(dir string) (dirList string) {
return dir + DIR_LIST_MARKER
}
func (store *UniversalRedisLuaStore) Shutdown() {
store.Client.Close()
}

View File

@ -0,0 +1,41 @@
package redis_lua
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)
func (store *UniversalRedisLuaStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
_, err = store.Client.Set(ctx, string(key), value, 0).Result()
if err != nil {
return fmt.Errorf("kv put: %v", err)
}
return nil
}
func (store *UniversalRedisLuaStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
data, err := store.Client.Get(ctx, string(key)).Result()
if err == redis.Nil {
return nil, fmt.Errorf("kv: not found")
}
return []byte(data), err
}
func (store *UniversalRedisLuaStore) KvDelete(ctx context.Context, key []byte) (err error) {
_, err = store.Client.Del(ctx, string(key)).Result()
if err != nil {
return fmt.Errorf("kv delete: %v", err)
}
return nil
}

View File

@ -1,10 +1,11 @@
package engine
package storage
import (
"bytes"
"fmt"
"io"
"robotfs/config"
"robotfs/utils"
"github.com/aws/aws-sdk-go/aws"
@ -14,43 +15,26 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
type S3Config struct {
Region string
Endpoint string
AccessKeyID string
SecretAccessKey string
BucketName string
UsePathStyle bool
}
func Initialize(configuration utils.Configuration, prefix string) (*S3Config, error) {
s3Cfg := &S3Config{
Region: configuration.GetString(prefix + "region"),
Endpoint: configuration.GetString(prefix + "endpoint"),
AccessKeyID: configuration.GetString(prefix + "access_key_id"),
SecretAccessKey: configuration.GetString(prefix + "secret_access_key"),
BucketName: configuration.GetString(prefix + "bucket_name"),
UsePathStyle: configuration.GetBool(prefix + "use_path_style"),
}
if s3Cfg.BucketName == "" {
return nil, fmt.Errorf("bucket_name is required")
}
return s3Cfg, nil
}
type StorageManager struct {
type Storage struct {
s3Client *s3.S3
s3Uploader *s3manager.Uploader
s3Downloader *s3manager.Downloader
bucketName string
}
func NewStorageManager(s3Config *S3Config) (*StorageManager, error) {
func NewStorage() *Storage {
return &Storage{}
}
func (s *Storage) Initialize(cfg config.Configuration, prefix string) error {
s3Config, err := initialize(cfg, prefix)
if err != nil {
return err
}
s3Config, awsSession, err := createS3Session(s3Config)
if err != nil {
return nil, err
return err
}
s3Client := s3.New(awsSession)
@ -63,12 +47,38 @@ func NewStorageManager(s3Config *S3Config) (*StorageManager, error) {
})
downloader := s3manager.NewDownloader(awsSession)
return &StorageManager{
s3Client: s3Client,
s3Uploader: uploader,
s3Downloader: downloader,
bucketName: s3Config.BucketName,
}, nil
s.s3Client = s3Client
s.s3Uploader = uploader
s.s3Downloader = downloader
s.bucketName = s3Config.BucketName
return nil
}
type S3Config struct {
Region string
Endpoint string
AccessKeyID string
SecretAccessKey string
BucketName string
UsePathStyle bool
}
func initialize(cfg config.Configuration, prefix string) (*S3Config, error) {
s3Cfg := &S3Config{
Region: cfg.GetString(prefix + "region"),
Endpoint: cfg.GetString(prefix + "endpoint"),
AccessKeyID: cfg.GetString(prefix + "access_key_id"),
SecretAccessKey: cfg.GetString(prefix + "secret_access_key"),
BucketName: cfg.GetString(prefix + "bucket_name"),
UsePathStyle: cfg.GetBool(prefix + "use_path_style"),
}
if s3Cfg.BucketName == "" {
return nil, fmt.Errorf("bucket_name is required")
}
return s3Cfg, nil
}
func createS3Session(config *S3Config) (*S3Config, *session.Session, error) {
@ -90,13 +100,13 @@ func createS3Session(config *S3Config) (*S3Config, *session.Session, error) {
return config, sess, nil
}
func (sm *StorageManager) UploadData(s3Key string, data []byte, contentType string) (*s3manager.UploadOutput, error) {
func (s *Storage) UploadData(s3Key string, data []byte, contentType string) (*s3manager.UploadOutput, error) {
if contentType == "" {
contentType = "application/octet-stream"
}
output, err := sm.s3Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(sm.bucketName),
output, err := s.s3Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(s3Key),
Body: bytes.NewReader(data),
ContentType: aws.String(contentType),
@ -108,13 +118,13 @@ func (sm *StorageManager) UploadData(s3Key string, data []byte, contentType stri
return output, nil
}
func (sm *StorageManager) UploadFile(s3Key string, data io.Reader, contentType string) (*s3manager.UploadOutput, error) {
func (s *Storage) UploadFile(s3Key string, data io.Reader, contentType string) (*s3manager.UploadOutput, error) {
if contentType == "" {
contentType = "application/octet-stream"
}
output, err := sm.s3Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(sm.bucketName),
output, err := s.s3Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(s3Key),
Body: data,
ContentType: aws.String(contentType),
@ -126,31 +136,31 @@ func (sm *StorageManager) UploadFile(s3Key string, data io.Reader, contentType s
return output, nil
}
func (sm *StorageManager) DownloadFile(S3Key string) (*utils.S3ReadSeeker, error) {
head, err := sm.s3Client.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(sm.bucketName),
func (s *Storage) DownloadFile(S3Key string) (*utils.S3ReadSeeker, error) {
head, err := s.s3Client.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(S3Key),
})
if err != nil {
return nil, err
}
result, err := sm.s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(sm.bucketName),
result, err := s.s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(S3Key),
})
if err != nil {
return nil, err
}
return utils.NewS3ReadSeeker(sm.s3Client, sm.bucketName, S3Key, result.Body, *head.ContentLength), nil
return utils.NewS3ReadSeeker(s.s3Client, s.bucketName, S3Key, result.Body, *head.ContentLength), nil
}
func (sm *StorageManager) ReadObject(S3Key string) ([]byte, error) {
func (s *Storage) ReadObject(S3Key string) ([]byte, error) {
buf := aws.NewWriteAtBuffer([]byte{})
_, err := sm.s3Downloader.Download(buf, &s3.GetObjectInput{
Bucket: aws.String(sm.bucketName),
_, err := s.s3Downloader.Download(buf, &s3.GetObjectInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(S3Key),
})
if err != nil {
@ -160,33 +170,33 @@ func (sm *StorageManager) ReadObject(S3Key string) ([]byte, error) {
return buf.Bytes(), nil
}
func (sm *StorageManager) DeleteObject(s3Key string) error {
_, err := sm.s3Client.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(sm.bucketName),
func (s *Storage) DeleteObject(s3Key string) error {
_, err := s.s3Client.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(s3Key),
})
if err != nil {
return err
}
return sm.s3Client.WaitUntilObjectNotExists(&s3.HeadObjectInput{
Bucket: aws.String(sm.bucketName),
return s.s3Client.WaitUntilObjectNotExists(&s3.HeadObjectInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(s3Key),
})
}
func (sm *StorageManager) CopyObject(oldS3key, newS3Key string) error {
_, err := sm.s3Client.CopyObject(&s3.CopyObjectInput{
Bucket: aws.String(sm.bucketName),
CopySource: aws.String(fmt.Sprintf("%s/%s", sm.bucketName, oldS3key)),
func (s *Storage) CopyObject(oldS3key, newS3Key string) error {
_, err := s.s3Client.CopyObject(&s3.CopyObjectInput{
Bucket: aws.String(s.bucketName),
CopySource: aws.String(fmt.Sprintf("%s/%s", s.bucketName, oldS3key)),
Key: aws.String(newS3Key),
})
if err != nil {
return err
}
return sm.s3Client.WaitUntilObjectExists(&s3.HeadObjectInput{
Bucket: aws.String(sm.bucketName),
return s.s3Client.WaitUntilObjectExists(&s3.HeadObjectInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(newS3Key),
})
}

View File

@ -19,9 +19,13 @@ func CreateService() (*Service, error) {
}, nil
}
func (s *Service) LoadMetadata() error {
return nil
}
func (s *Service) Start() error {
if err := s.Engine.Start(); err != nil {
return fmt.Errorf("failed to start engine: %v", err)
return fmt.Errorf("start engine failed: %v", err)
}
return nil

View File

@ -39,3 +39,13 @@ func Remove(root, path string, isDir bool) error {
return nil
}
func Mkdir(root, path string) error {
cmd := exec.Command("mkdir", root+path)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("mkdir failed: %v, output: %s", err, string(output))
}
return nil
}