add: init repo

This commit is contained in:
d-robotics 2025-05-12 20:23:33 +08:00
commit 3f53e1d922
28 changed files with 1634 additions and 0 deletions

21
Makefile Normal file
View File

@ -0,0 +1,21 @@
VERSION := $(shell git describe --tags --abbrev=0 2>/dev/null || echo "dev")
COMMIT := $(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown")
BRANCH := $(shell git rev-parse --abbrev-ref HEAD 2>/dev/null || echo "unknown")
BUILD_TIME := $(shell date "+%Y-%m-%d %H:%M:%S")
LDFLAGS := -X gosvc.setVersionNumber=$(VERSION) \
-X gosvc.setVersionRelease=$(BRANCH) \
-X gosvc.setVersionBuildTime=$(BUILD_TIME) \
-X gosvc.setVersionDescription="RobotFS Service (commit: $(COMMIT))"
.PHONY: build
build:
go build -ldflags "$(LDFLAGS)" -o robotfs main.go
.PHONY: run
run: build
./robotfs
.PHONY: clean
clean:
rm -f robotfs

0
README.md Normal file
View File

12
api_other.go Normal file
View File

@ -0,0 +1,12 @@
package main
import (
"gosvc/httpserver"
)
func (s *Service) API_Ping(
req *httpserver.Request,
resp *httpserver.Response,
) *httpserver.Response {
return resp.OK("pong")
}

53
config/config.go Normal file
View File

@ -0,0 +1,53 @@
package config
import (
"time"
"gosvc"
"gosvc/httpserver"
"gosvc/logger"
)
func SetupServiceConfig() {
gosvc.Config = gosvc.ServiceConfig{
Name: "robotfs",
Version: gosvc.Config.Version,
Runtime: gosvc.RuntimeConfig{
RootOnly: false,
},
Logger: gosvc.LoggerConfig{
Options: logger.Options{
TargetDirectory: "/var/log/robotfs",
FilePrefix: "robotfs",
CycleFiles: 16,
MaxFileSize: 64 * 1024 * 1024,
},
},
Ticker: gosvc.TickerConfig{
Enable: false,
TickInterval: 1300 * time.Millisecond,
TickMinInterval: 300 * time.Millisecond,
TickOnStop: true,
},
Saver: gosvc.SaverConfig{
Enable: false,
SaveInterval: 60 * time.Second,
SaveMinInterval: 500 * time.Millisecond,
},
HTTPServer: gosvc.HTTPServerConfig{
Enable: true,
Options: httpserver.Options{
Address: "0.0.0.0:6201",
TLSEnabled: false,
CrossOriginEnabled: true,
MaxConcurrentStreams: 10000,
MaxTraces: 30 * 10000,
LogSlowRequest: true,
SlowRequestThreshold: 1000 * time.Millisecond,
},
},
}
gosvc.Config.HTTPServer.MaxRequestsPerSecond = 4000
gosvc.Config.HTTPServer.MaxPendingRequests = 15000
}

29
config/security.go Normal file
View File

@ -0,0 +1,29 @@
package config
type Config struct {
Robotfs struct {
Address string `mapstructure:"ADDRESS"`
LogPath string `mapstructure:"LOGPATH"`
} `mapstructure:"robotfs"`
Redis struct {
Address string `mapstructure:"address"`
Password string `mapstructure:"password"`
DB int `mapstructure:"db"`
} `mapstructure:"redis"`
S3 struct {
Region string `mapstructure:"region"`
Endpoint string `mapstructure:"endpoint"`
AccessKeyID string `mapstructure:"access_key_id"`
SecretAccessKey string `mapstructure:"secret_access_key"`
BucketName string `mapstructure:"bucket_name"`
UsePathStyle bool `mapstructure:"use_path_style"`
} `mapstructure:"s3"`
}
func GetConfig() *Config {
config := &Config{}
return config
}

47
engine/engine.go Normal file
View File

@ -0,0 +1,47 @@
package engine
import (
"fmt"
"robotfs/store/redis_lua"
"robotfs/utils"
)
type Engine struct {
metadataManager *MetaDataManager
storageManager *StorageManager
}
func NewEngine() *Engine {
return &Engine{}
}
func (e *Engine) Start() error {
store := redis_lua.NewRedisLuaStore()
if err := store.Initialize(utils.GetViper(), "redis."); err != nil {
return fmt.Errorf("failed to initialize meta store: %v", err)
}
metadata, err := NewMetaDataManager(store)
if err != nil {
return fmt.Errorf("failed to create metadata manager: %v", err)
}
e.metadataManager = metadata
s3Config, err := Initialize(utils.GetViper(), "s3.")
if err != nil {
return fmt.Errorf("failed to initialize s3 config: %v", err)
}
storageManager, err := NewStorageManager(s3Config)
if err != nil {
return fmt.Errorf("failed to create storage manager: %v", err)
}
e.storageManager = storageManager
return nil
}
func (e *Engine) Stop() {
if e.metadataManager != nil {
e.metadataManager.Shutdown()
}
}

View File

@ -0,0 +1,28 @@
package engine
import (
"fmt"
"robotfs/store"
)
type MetaDataManager struct {
store store.MetaStore
}
func NewMetaDataManager(store store.MetaStore) (*MetaDataManager, error) {
if store == nil {
return nil, fmt.Errorf("meta store is required")
}
return &MetaDataManager{
store: store,
}, nil
}
func (m *MetaDataManager) Shutdown() error {
if m.store != nil {
m.store.Shutdown()
}
return nil
}

204
engine/storage_manager.go Normal file
View File

@ -0,0 +1,204 @@
package engine
import (
"bytes"
"fmt"
"io"
"os"
"robotfs/utils"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
type S3Config struct {
Region string
Endpoint string
AccessKeyID string
SecretAccessKey string
BucketName string
UsePathStyle bool
}
func Initialize(configuration utils.Configuration, prefix string) (*S3Config, error) {
s3Cfg := &S3Config{
Region: configuration.GetString(prefix + "region"),
Endpoint: configuration.GetString(prefix + "endpoint"),
AccessKeyID: configuration.GetString(prefix + "access_key_id"),
SecretAccessKey: configuration.GetString(prefix + "secret_access_key"),
BucketName: configuration.GetString(prefix + "bucket_name"),
UsePathStyle: configuration.GetBool(prefix + "use_path_style"),
}
if s3Cfg.BucketName == "" {
return nil, fmt.Errorf("bucket_name is required")
}
return s3Cfg, nil
}
type StorageManager struct {
s3Client *s3.S3
s3Uploader *s3manager.Uploader
s3Downloader *s3manager.Downloader
bucketName string
}
func NewStorageManager(s3Config *S3Config) (*StorageManager, error) {
s3Config, awsSession, err := createS3Session(s3Config)
if err != nil {
return nil, err
}
s3Client := s3.New(awsSession)
uploader := s3manager.NewUploader(awsSession)
downloader := s3manager.NewDownloader(awsSession)
return &StorageManager{
s3Client: s3Client,
s3Uploader: uploader,
s3Downloader: downloader,
bucketName: s3Config.BucketName,
}, nil
}
func createS3Session(config *S3Config) (*S3Config, *session.Session, error) {
awsConfig := &aws.Config{
Credentials: credentials.NewStaticCredentials(config.AccessKeyID, config.SecretAccessKey, ""),
Region: aws.String(config.Region),
S3ForcePathStyle: aws.Bool(config.UsePathStyle),
}
if config.Endpoint != "" {
awsConfig.Endpoint = aws.String(config.Endpoint)
}
sess, err := session.NewSession(awsConfig)
if err != nil {
return config, nil, err
}
return config, sess, nil
}
func (sm *StorageManager) UploadData(s3Key string, data []byte, contentType string) (*s3manager.UploadOutput, error) {
if contentType == "" {
contentType = "application/octet-stream"
}
output, err := sm.s3Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(s3Key),
Body: bytes.NewReader(data),
ContentType: aws.String(contentType),
})
if err != nil {
return nil, err
}
return output, nil
}
func (sm *StorageManager) UploadFile(s3Key string, data io.Reader, contentType string) (*s3manager.UploadOutput, error) {
if contentType == "" {
contentType = "application/octet-stream"
}
output, err := sm.s3Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(s3Key),
Body: data,
ContentType: aws.String(contentType),
})
if err != nil {
return nil, err
}
return output, nil
}
func (sm *StorageManager) DownloadFile(S3Key string, localFilePath string) error {
file, err := os.Create(localFilePath)
if err != nil {
return err
}
defer file.Close()
_, err = sm.s3Downloader.Download(file, &s3.GetObjectInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(S3Key),
})
return err
}
func (sm *StorageManager) ReadObject(S3Key string) ([]byte, error) {
buf := aws.NewWriteAtBuffer([]byte{})
_, err := sm.s3Downloader.Download(buf, &s3.GetObjectInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(S3Key),
})
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (sm *StorageManager) DeleteObject(s3Key string) error {
_, err := sm.s3Client.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(s3Key),
})
if err != nil {
return err
}
return sm.s3Client.WaitUntilObjectNotExists(&s3.HeadObjectInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(s3Key),
})
}
func (sm *StorageManager) CopyObject(oldS3key, newS3Key string) error {
_, err := sm.s3Client.CopyObject(&s3.CopyObjectInput{
Bucket: aws.String(sm.bucketName),
CopySource: aws.String(fmt.Sprintf("%s/%s", sm.bucketName, oldS3key)),
Key: aws.String(newS3Key),
})
if err != nil {
return err
}
return sm.s3Client.WaitUntilObjectExists(&s3.HeadObjectInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(newS3Key),
})
}
func (sm *StorageManager) ListObjects(prefix string) ([]string, error) {
var result []string
input := &s3.ListObjectsV2Input{
Bucket: aws.String(sm.bucketName),
Prefix: aws.String(prefix),
}
err := sm.s3Client.ListObjectsV2Pages(input, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
for _, obj := range page.Contents {
result = append(result, *obj.Key)
}
return !lastPage
})
if err != nil {
return nil, err
}
return result, nil
}

View File

@ -0,0 +1,42 @@
package engine
import (
"context"
"time"
)
type WorkSpaceManager struct{}
func NewWorkSpaceManager() *WorkSpaceManager {
return &WorkSpaceManager{}
}
type StorageSpace struct {
ID string
Name string
OwnerID string
Type string
CreateTime time.Time
UpdateTime time.Time
InviteCode string
RootPath string
}
func (wsm *WorkSpaceManager) CreateWorkSpace(ctx context.Context, ownerID, name, visibility string) (string, error) {
return "", nil
}
func (wsm *WorkSpaceManager) GetWorkSpace(ctx context.Context, spaceID string) {
}
func (wsm *WorkSpaceManager) UpdateWorkSpace(ctx context.Context, spaceID string) (*StorageSpace, error) {
return nil, nil
}
func (wsm *WorkSpaceManager) GenerateInviteCode(ctx context.Context, spaceID string) (string, error) {
return "", nil
}
func (wsm *WorkSpaceManager) ImportWorkSpace(ctx context.Context, code string, newOwnerID string) (string, error) {
return "", nil
}

41
go.mod Normal file
View File

@ -0,0 +1,41 @@
module robotfs
go 1.23.8
replace gosvc => ../gosvc
require (
github.com/aws/aws-sdk-go v1.55.6
github.com/redis/go-redis/v9 v9.8.0
github.com/spf13/viper v1.20.1
google.golang.org/protobuf v1.36.6
gosvc v0.0.0-00010101000000-000000000000
)
require (
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/go-ole/go-ole v1.2.5 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/sagikazarmark/locafero v0.7.0 // indirect
github.com/shirou/gopsutil v3.21.3+incompatible // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.12.0 // indirect
github.com/spf13/cast v1.7.1 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.21.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

67
main.go Normal file
View File

@ -0,0 +1,67 @@
package main
import (
"flag"
"fmt"
"gosvc"
"os"
"robotfs/config"
"robotfs/utils"
)
const usage = `Usage: robotfs [options]
Options:
--config string Path to config file (required)
--version Show version information
--help Show this help message
Example:
robotfs --config=/path/to/config.yaml`
func printUsage() {
fmt.Println(usage)
}
func main() {
configFile := flag.String("config", "", "path to config file")
version := flag.Bool("version", false, "show version information")
help := flag.Bool("help", false, "show help message")
flag.Parse()
if *help {
printUsage()
return
}
if *version {
gosvc.Config.PrintVersion()
return
}
if *configFile != "" {
utils.ConfigurationFileDirectory.Set(*configFile)
if !utils.LoadConfiguration("config", true) {
os.Exit(1)
}
} else {
fmt.Println("Error: --config flag is required")
printUsage()
os.Exit(1)
}
config.SetupServiceConfig()
svc, err := CreateService()
if err != nil {
fmt.Println("create service failed, " + err.Error())
return
}
err = gosvc.Run(svc)
if err != nil {
fmt.Println("run service failed, " + err.Error())
return
}
}

212
pb/metadata.pb.go Normal file
View File

@ -0,0 +1,212 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v5.29.3
// source: pb/metadata.proto
package pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type FileEntry struct {
state protoimpl.MessageState `protogen:"open.v1"`
FullPath string `protobuf:"bytes,1,opt,name=full_path,json=fullPath,proto3" json:"full_path,omitempty"`
IsDir bool `protobuf:"varint,2,opt,name=is_dir,json=isDir,proto3" json:"is_dir,omitempty"`
Size uint64 `protobuf:"varint,3,opt,name=size,proto3" json:"size,omitempty"`
CreateTime int64 `protobuf:"varint,4,opt,name=create_time,json=createTime,proto3" json:"create_time,omitempty"`
S3Key string `protobuf:"bytes,5,opt,name=s3_key,json=s3Key,proto3" json:"s3_key,omitempty"`
ContentType string `protobuf:"bytes,6,opt,name=content_type,json=contentType,proto3" json:"content_type,omitempty"`
Etag string `protobuf:"bytes,7,opt,name=etag,proto3" json:"etag,omitempty"`
VersionId string `protobuf:"bytes,8,opt,name=version_id,json=versionId,proto3" json:"version_id,omitempty"`
LastModificationTime int64 `protobuf:"varint,9,opt,name=last_modification_time,json=lastModificationTime,proto3" json:"last_modification_time,omitempty"`
Extended map[string][]byte `protobuf:"bytes,10,rep,name=extended,proto3" json:"extended,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *FileEntry) Reset() {
*x = FileEntry{}
mi := &file_pb_metadata_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *FileEntry) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*FileEntry) ProtoMessage() {}
func (x *FileEntry) ProtoReflect() protoreflect.Message {
mi := &file_pb_metadata_proto_msgTypes[0]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use FileEntry.ProtoReflect.Descriptor instead.
func (*FileEntry) Descriptor() ([]byte, []int) {
return file_pb_metadata_proto_rawDescGZIP(), []int{0}
}
func (x *FileEntry) GetFullPath() string {
if x != nil {
return x.FullPath
}
return ""
}
func (x *FileEntry) GetIsDir() bool {
if x != nil {
return x.IsDir
}
return false
}
func (x *FileEntry) GetSize() uint64 {
if x != nil {
return x.Size
}
return 0
}
func (x *FileEntry) GetCreateTime() int64 {
if x != nil {
return x.CreateTime
}
return 0
}
func (x *FileEntry) GetS3Key() string {
if x != nil {
return x.S3Key
}
return ""
}
func (x *FileEntry) GetContentType() string {
if x != nil {
return x.ContentType
}
return ""
}
func (x *FileEntry) GetEtag() string {
if x != nil {
return x.Etag
}
return ""
}
func (x *FileEntry) GetVersionId() string {
if x != nil {
return x.VersionId
}
return ""
}
func (x *FileEntry) GetLastModificationTime() int64 {
if x != nil {
return x.LastModificationTime
}
return 0
}
func (x *FileEntry) GetExtended() map[string][]byte {
if x != nil {
return x.Extended
}
return nil
}
var File_pb_metadata_proto protoreflect.FileDescriptor
const file_pb_metadata_proto_rawDesc = "" +
"\n" +
"\x11pb/metadata.proto\x12\arobotfs\"\x92\x03\n" +
"\tFileEntry\x12\x1b\n" +
"\tfull_path\x18\x01 \x01(\tR\bfullPath\x12\x15\n" +
"\x06is_dir\x18\x02 \x01(\bR\x05isDir\x12\x12\n" +
"\x04size\x18\x03 \x01(\x04R\x04size\x12\x1f\n" +
"\vcreate_time\x18\x04 \x01(\x03R\n" +
"createTime\x12\x15\n" +
"\x06s3_key\x18\x05 \x01(\tR\x05s3Key\x12!\n" +
"\fcontent_type\x18\x06 \x01(\tR\vcontentType\x12\x12\n" +
"\x04etag\x18\a \x01(\tR\x04etag\x12\x1d\n" +
"\n" +
"version_id\x18\b \x01(\tR\tversionId\x124\n" +
"\x16last_modification_time\x18\t \x01(\x03R\x14lastModificationTime\x12<\n" +
"\bextended\x18\n" +
" \x03(\v2 .robotfs.FileEntry.ExtendedEntryR\bextended\x1a;\n" +
"\rExtendedEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\fR\x05value:\x028\x01B\fZ\n" +
"robotfs/pbb\x06proto3"
var (
file_pb_metadata_proto_rawDescOnce sync.Once
file_pb_metadata_proto_rawDescData []byte
)
func file_pb_metadata_proto_rawDescGZIP() []byte {
file_pb_metadata_proto_rawDescOnce.Do(func() {
file_pb_metadata_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_pb_metadata_proto_rawDesc), len(file_pb_metadata_proto_rawDesc)))
})
return file_pb_metadata_proto_rawDescData
}
var file_pb_metadata_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_pb_metadata_proto_goTypes = []any{
(*FileEntry)(nil), // 0: robotfs.FileEntry
nil, // 1: robotfs.FileEntry.ExtendedEntry
}
var file_pb_metadata_proto_depIdxs = []int32{
1, // 0: robotfs.FileEntry.extended:type_name -> robotfs.FileEntry.ExtendedEntry
1, // [1:1] is the sub-list for method output_type
1, // [1:1] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_pb_metadata_proto_init() }
func file_pb_metadata_proto_init() {
if File_pb_metadata_proto != nil {
return
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_pb_metadata_proto_rawDesc), len(file_pb_metadata_proto_rawDesc)),
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_pb_metadata_proto_goTypes,
DependencyIndexes: file_pb_metadata_proto_depIdxs,
MessageInfos: file_pb_metadata_proto_msgTypes,
}.Build()
File_pb_metadata_proto = out.File
file_pb_metadata_proto_goTypes = nil
file_pb_metadata_proto_depIdxs = nil
}

18
pb/metadata.proto Normal file
View File

@ -0,0 +1,18 @@
syntax = "proto3";
package robotfs;
option go_package = "robotfs/pb";
message FileEntry {
string full_path = 1;
bool is_dir = 2;
uint64 size = 3;
int64 create_time = 4;
string s3_key = 5;
string content_type = 6;
string etag = 7;
string version_id = 8;
int64 last_modification_time = 9;
map<string, bytes> extended = 10;
}

19
router.go Normal file
View File

@ -0,0 +1,19 @@
package main
import (
"net/http"
"gosvc/httpserver"
)
func (s *Service) RegisterRouteRules() {
s.HTTPServer.RegisterRouteRules(
[]*httpserver.RouteRule{
{
Path: "/ping",
Method: http.MethodGet,
Handler: s.API_Ping,
},
},
)
}

35
service.go Normal file
View File

@ -0,0 +1,35 @@
package main
import (
"fmt"
"gosvc"
"robotfs/engine"
)
type Service struct {
gosvc.ServiceBase
engine *engine.Engine
}
func CreateService() (*Service, error) {
return &Service{
engine: engine.NewEngine(),
}, nil
}
func (s *Service) Start() error {
if err := s.engine.Start(); err != nil {
return fmt.Errorf("failed to start engine: %v", err)
}
return nil
}
func (s *Service) Stop() {
s.engine.Stop()
}
func (s *Service) Tick() {
}

30
store/meta_store.go Normal file
View File

@ -0,0 +1,30 @@
package store
import (
"context"
"robotfs/pb"
"robotfs/utils"
)
type ListEachEntryFunc func(entry *pb.FileEntry) bool
type MetaStore interface {
GetName() string
Initialize(configuration utils.Configuration, prefix string) error
InsertEntry(context.Context, *pb.FileEntry) error
UpdateEntry(context.Context, *pb.FileEntry) (err error)
FindEntry(context.Context, utils.FullPath) (entry *pb.FileEntry, err error)
DeleteEntry(context.Context, utils.FullPath) (err error)
DeleteFolderChildren(context.Context, utils.FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
ListDirectoryPrefixedEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
BeginTransaction(ctx context.Context) (context.Context, error)
CommitTransaction(ctx context.Context) error
RollbackTransaction(ctx context.Context) error
Shutdown()
}

View File

@ -0,0 +1,36 @@
package redis_lua
import (
"robotfs/utils"
"github.com/redis/go-redis/v9"
)
type RedisLuaStore struct {
UniversalRedisLuaStore
}
func NewRedisLuaStore() *RedisLuaStore {
return &RedisLuaStore{}
}
func (store *RedisLuaStore) GetName() string {
return "redis_lua"
}
func (store *RedisLuaStore) Initialize(configuration utils.Configuration, prefix string) (err error) {
return store.initialize(
configuration.GetString(prefix+"address"),
configuration.GetString(prefix+"password"),
configuration.GetInt(prefix+"database"),
)
}
func (store *RedisLuaStore) initialize(hostPort string, password string, database int) (err error) {
store.Client = redis.NewClient(&redis.Options{
Addr: hostPort,
Password: password,
DB: database,
})
return
}

View File

@ -0,0 +1,19 @@
-- KEYS[1]: full path of entry
local fullpath = KEYS[1]
-- KEYS[2]: full path of entry
local fullpath_list_key = KEYS[2]
-- KEYS[3]: dir of the entry
local dir_list_key = KEYS[3]
-- ARGV[1]: isSuperLargeDirectory
local isSuperLargeDirectory = ARGV[1] == "1"
-- ARGV[2]: name of the entry
local name = ARGV[2]
redis.call("DEL", fullpath, fullpath_list_key)
if not isSuperLargeDirectory and name ~= "" then
redis.call("ZREM", dir_list_key, name)
end
return 0

View File

@ -0,0 +1,15 @@
-- KEYS[1]: full path of entry
local fullpath = KEYS[1]
if fullpath ~= "" and string.sub(fullpath, -1) == "/" then
fullpath = string.sub(fullpath, 0, -2)
end
local files = redis.call("ZRANGE", fullpath .. "\0", "0", "-1")
for _, name in ipairs(files) do
local file_path = fullpath .. "/" .. name
redis.call("DEL", file_path, file_path .. "\0")
end
return 0

View File

@ -0,0 +1,24 @@
package stored_procedure
import (
_ "embed"
"github.com/redis/go-redis/v9"
)
func init() {
InsertEntryScript = redis.NewScript(insertEntry)
DeleteEntryScript = redis.NewScript(deleteEntry)
DeleteFolderChildrenScript = redis.NewScript(deleteFolderChildren)
}
//go:embed insert_entry.lua
var insertEntry string
var InsertEntryScript *redis.Script
//go:embed delete_entry.lua
var deleteEntry string
var DeleteEntryScript *redis.Script
//go:embed delete_folder_children.lua
var deleteFolderChildren string
var DeleteFolderChildrenScript *redis.Script

View File

@ -0,0 +1,27 @@
-- KEYS[1]: full path of entry
local full_path = KEYS[1]
-- KEYS[2]: dir of the entry
local dir_list_key = KEYS[2]
-- ARGV[1]: content of the entry
local entry = ARGV[1]
-- ARGV[2]: TTL of the entry
local ttlSec = tonumber(ARGV[2])
-- ARGV[3]: isSuperLargeDirectory
local isSuperLargeDirectory = ARGV[3] == "1"
-- ARGV[4]: zscore of the entry in zset
local zscore = tonumber(ARGV[4])
-- ARGV[5]: name of the entry
local name = ARGV[5]
if ttlSec > 0 then
redis.call("SET", full_path, entry, "EX", ttlSec)
else
redis.call("SET", full_path, entry)
end
if not isSuperLargeDirectory and name ~= "" then
redis.call("ZADD", dir_list_key, "NX", zscore, name)
end
return 0

View File

@ -0,0 +1,189 @@
package redis_lua
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"robotfs/pb"
"robotfs/store"
"robotfs/store/redis_lua/stored_procedure"
"robotfs/utils"
)
const (
DIR_LIST_MARKER = "\x00"
)
type UniversalRedisLuaStore struct {
Client redis.UniversalClient
superLargeDirectoryHash map[string]bool
}
func (store *UniversalRedisLuaStore) isSuperLargeDirectory(dir string) (isSuperLargeDirectory bool) {
_, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
return
}
func (store *UniversalRedisLuaStore) loadSuperLargeDirectories(superLargeDirectories []string) {
store.superLargeDirectoryHash = make(map[string]bool)
for _, dir := range superLargeDirectories {
store.superLargeDirectoryHash[dir] = true
}
}
func (store *UniversalRedisLuaStore) BeginTransaction(ctx context.Context) (context.Context, error) {
return ctx, nil
}
func (store *UniversalRedisLuaStore) CommitTransaction(ctx context.Context) error {
return nil
}
func (store *UniversalRedisLuaStore) RollbackTransaction(ctx context.Context) error {
return nil
}
func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *pb.FileEntry) (err error) {
// value, err := entry.EncodeAttributesAndChunks()
// if err != nil {
// return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
// }
// if len(entry.GetChunks()) > CountEntryChunksForGzip {
// value = util.MaybeGzipData(value)
// }
// dir, name := entry.FullPath.DirAndName()
// err = stored_procedure.InsertEntryScript.Run(ctx, store.Client,
// []string{string(entry.FullPath), genDirectoryListKey(dir)},
// value, entry.TtlSec,
// store.isSuperLargeDirectory(dir), 0, name).Err()
// if err != nil {
// return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
// }
return nil
}
func (store *UniversalRedisLuaStore) UpdateEntry(ctx context.Context, entry *pb.FileEntry) (err error) {
return store.InsertEntry(ctx, entry)
}
func (store *UniversalRedisLuaStore) FindEntry(ctx context.Context, fullpath utils.FullPath) (entry *pb.FileEntry, err error) {
// data, err := store.Client.Get(ctx, string(fullpath)).Result()
// if err == redis.Nil {
// return nil, pb.ErrNotFound
// }
// if err != nil {
// return nil, fmt.Errorf("get %s : %v", fullpath, err)
// }
// entry = &pb.FileEntry{
// FullPath: fullpath.Name(),
// }
// err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
// if err != nil {
// return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
// }
return entry, nil
}
func (store *UniversalRedisLuaStore) DeleteEntry(ctx context.Context, fullpath utils.FullPath) (err error) {
dir, name := fullpath.DirAndName()
err = stored_procedure.DeleteEntryScript.Run(ctx, store.Client,
[]string{string(fullpath), genDirectoryListKey(string(fullpath)), genDirectoryListKey(dir)},
store.isSuperLargeDirectory(dir), name).Err()
if err != nil {
return fmt.Errorf("DeleteEntry %s : %v", fullpath, err)
}
return nil
}
func (store *UniversalRedisLuaStore) DeleteFolderChildren(ctx context.Context, fullpath utils.FullPath) (err error) {
if store.isSuperLargeDirectory(string(fullpath)) {
return nil
}
err = stored_procedure.DeleteFolderChildrenScript.Run(ctx, store.Client,
[]string{string(fullpath)}).Err()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
}
return nil
}
func (store *UniversalRedisLuaStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc store.ListEachEntryFunc) (lastFileName string, err error) {
// return lastFileName, engine.ErrUnsupportedListDirectoryPrefixed
return lastFileName, err
}
func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath utils.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc store.ListEachEntryFunc) (lastFileName string, err error) {
// dirListKey := genDirectoryListKey(string(dirPath))
// min := "-"
// if startFileName != "" {
// if includeStartFile {
// min = "[" + startFileName
// } else {
// min = "(" + startFileName
// }
// }
// members, err := store.Client.ZRangeByLex(ctx, dirListKey, &redis.ZRangeBy{
// Min: min,
// Max: "+",
// Offset: 0,
// Count: limit,
// }).Result()
// if err != nil {
// return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
// }
// // fetch entry meta
// for _, fileName := range members {
// path := utils.NewFullPath(string(dirPath), fileName)
// entry, err := store.FindEntry(ctx, path)
// lastFileName = fileName
// if err != nil {
// if err == pb.ErrNotFound {
// continue
// }
// } else {
// if entry.TtlSec > 0 {
// if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
// store.DeleteEntry(ctx, path)
// continue
// }
// }
// if !eachEntryFunc(entry) {
// break
// }
// }
// }
return lastFileName, err
}
func genDirectoryListKey(dir string) (dirList string) {
return dir + DIR_LIST_MARKER
}
func (store *UniversalRedisLuaStore) Shutdown() {
store.Client.Close()
}

View File

@ -0,0 +1,210 @@
package redis
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
const (
DIR_LIST_MARKER = "\x00"
)
type UniversalRedis2Store struct {
Client redis.UniversalClient
}
func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
return ctx, nil
}
func (store *UniversalRedis2Store) CommitTransaction(ctx context.Context) error {
return nil
}
func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) error {
return nil
}
func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
if err = store.doInsertEntry(ctx, entry); err != nil {
return err
}
dir, name := entry.FullPath.DirAndName()
if store.isSuperLargeDirectory(dir) {
return nil
}
if name != "" {
if err = store.Client.ZAddNX(ctx, genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
}
return nil
}
func (store *UniversalRedis2Store) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
return nil
}
func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.doInsertEntry(ctx, entry)
}
func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
data, err := store.Client.Get(ctx, string(fullpath)).Result()
if err == redis.Nil {
return nil, filer_pb.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
entry = &filer.Entry{
FullPath: fullpath,
}
err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
return entry, nil
}
func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
_, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result()
if err != nil {
return fmt.Errorf("delete dir list %s : %v", fullpath, err)
}
_, err = store.Client.Del(ctx, string(fullpath)).Result()
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
dir, name := fullpath.DirAndName()
if store.isSuperLargeDirectory(dir) {
return nil
}
if name != "" {
_, err = store.Client.ZRem(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err)
}
}
return nil
}
func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
if store.isSuperLargeDirectory(string(fullpath)) {
return nil
}
members, err := store.Client.ZRangeByLex(ctx, genDirectoryListKey(string(fullpath)), &redis.ZRangeBy{
Min: "-",
Max: "+",
}).Result()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
}
for _, fileName := range members {
path := util.NewFullPath(string(fullpath), fileName)
_, err = store.Client.Del(ctx, string(path)).Result()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
}
// not efficient, but need to remove if it is a directory
store.Client.Del(ctx, genDirectoryListKey(string(path)))
}
return nil
}
func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath))
min := "-"
if startFileName != "" {
if includeStartFile {
min = "[" + startFileName
} else {
min = "(" + startFileName
}
}
members, err := store.Client.ZRangeByLex(ctx, dirListKey, &redis.ZRangeBy{
Min: min,
Max: "+",
Offset: 0,
Count: limit,
}).Result()
if err != nil {
return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
// fetch entry meta
for _, fileName := range members {
path := util.NewFullPath(string(dirPath), fileName)
entry, err := store.FindEntry(ctx, path)
lastFileName = fileName
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
if err == filer_pb.ErrNotFound {
continue
}
} else {
if entry.TtlSec > 0 {
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
store.Client.Del(ctx, string(path)).Result()
store.Client.ZRem(ctx, dirListKey, fileName).Result()
continue
}
}
if !eachEntryFunc(entry) {
break
}
}
}
return lastFileName, err
}
func genDirectoryListKey(dir string) (dirList string) {
return dir + DIR_LIST_MARKER
}
func (store *UniversalRedis2Store) Shutdown() {
store.Client.Close()
}

View File

@ -0,0 +1,41 @@
package redis_lua
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)
func (store *UniversalRedisLuaStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
_, err = store.Client.Set(ctx, string(key), value, 0).Result()
if err != nil {
return fmt.Errorf("kv put: %v", err)
}
return nil
}
func (store *UniversalRedisLuaStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
data, err := store.Client.Get(ctx, string(key)).Result()
if err == redis.Nil {
return nil, fmt.Errorf("kv: not found")
}
return []byte(data), err
}
func (store *UniversalRedisLuaStore) KvDelete(ctx context.Context, key []byte) (err error) {
_, err = store.Client.Del(ctx, string(key)).Result()
if err != nil {
return fmt.Errorf("kv delete: %v", err)
}
return nil
}

115
utils/config.go Normal file
View File

@ -0,0 +1,115 @@
package utils
import (
"strings"
"sync"
"gosvc/logger"
"github.com/spf13/viper"
)
var (
ConfigurationFileDirectory DirectoryValueType
loadSecurityConfigOnce sync.Once
)
type DirectoryValueType string
func (s *DirectoryValueType) Set(value string) error {
*s = DirectoryValueType(value)
return nil
}
func (s *DirectoryValueType) String() string {
return string(*s)
}
type Configuration interface {
GetString(key string) string
GetBool(key string) bool
GetInt(key string) int
GetStringSlice(key string) []string
SetDefault(key string, value interface{})
}
func LoadSecurityConfiguration() {
loadSecurityConfigOnce.Do(func() {
LoadConfiguration("security", false)
})
}
func LoadConfiguration(configFileName string, required bool) (loaded bool) {
viper.SetConfigName(configFileName) // name of config file (without extension)
viper.AddConfigPath(".") // look for config in the working directory
if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file
if strings.Contains(err.Error(), "Not Found") {
logger.Info("Reading %s: %v", viper.ConfigFileUsed(), err)
} else {
logger.Error("Reading %s: %v", viper.ConfigFileUsed(), err)
}
if required {
logger.Error("Failed to load %s.yaml file from current directory\n"+
"\nPlease use this command to run the program:\n"+
" robotfs --config=%s.yaml\n\n\n",
configFileName, configFileName)
}
return false
}
logger.Info("Reading %s.yaml from %s", configFileName, viper.ConfigFileUsed())
return true
}
type ViperProxy struct {
*viper.Viper
sync.Mutex
}
var (
vp = &ViperProxy{}
)
func (vp *ViperProxy) SetDefault(key string, value interface{}) {
vp.Lock()
defer vp.Unlock()
vp.Viper.SetDefault(key, value)
}
func (vp *ViperProxy) GetString(key string) string {
vp.Lock()
defer vp.Unlock()
return vp.Viper.GetString(key)
}
func (vp *ViperProxy) GetBool(key string) bool {
vp.Lock()
defer vp.Unlock()
return vp.Viper.GetBool(key)
}
func (vp *ViperProxy) GetInt(key string) int {
vp.Lock()
defer vp.Unlock()
return vp.Viper.GetInt(key)
}
func (vp *ViperProxy) GetStringSlice(key string) []string {
vp.Lock()
defer vp.Unlock()
return vp.Viper.GetStringSlice(key)
}
func GetViper() *ViperProxy {
vp.Lock()
defer vp.Unlock()
if vp.Viper == nil {
vp.Viper = viper.GetViper()
vp.AutomaticEnv()
vp.SetEnvPrefix("robotfs")
vp.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
}
return vp
}

14
utils/file.go Normal file
View File

@ -0,0 +1,14 @@
package utils
import (
"strings"
"path/filepath"
)
func NormalizePath(path string) string {
if !strings.HasPrefix(path, "/") {
path = "/" + path
}
return filepath.Clean(path)
}

72
utils/fullPath.go Normal file
View File

@ -0,0 +1,72 @@
package utils
import (
"path/filepath"
"strings"
)
type FullPath string
func NewFullPath(dir, name string) FullPath {
return FullPath(dir).Child(name)
}
func (fp FullPath) DirAndName() (string, string) {
dir, name := filepath.Split(string(fp))
name = strings.ToValidUTF8(name, "?")
if dir == "/" {
return dir, name
}
if len(dir) < 1 {
return "/", ""
}
return dir[:len(dir)-1], name
}
func (fp FullPath) Name() string {
_, name := filepath.Split(string(fp))
name = strings.ToValidUTF8(name, "?")
return name
}
func (fp FullPath) Child(name string) FullPath {
dir := string(fp)
noPrefix := name
if strings.HasPrefix(name, "/") {
noPrefix = name[1:]
}
if strings.HasSuffix(dir, "/") {
return FullPath(dir + noPrefix)
}
return FullPath(dir + "/" + noPrefix)
}
// split, but skipping the root
func (fp FullPath) Split() []string {
if fp == "" || fp == "/" {
return []string{}
}
return strings.Split(string(fp)[1:], "/")
}
func Join(names ...string) string {
return filepath.ToSlash(filepath.Join(names...))
}
func JoinPath(names ...string) FullPath {
return FullPath(Join(names...))
}
func (fp FullPath) IsUnder(other FullPath) bool {
if other == "/" {
return true
}
return strings.HasPrefix(string(fp), string(other)+"/")
}
func StringSplit(separatedValues string, sep string) []string {
if separatedValues == "" {
return nil
}
return strings.Split(separatedValues, sep)
}

14
utils/s3.go Normal file
View File

@ -0,0 +1,14 @@
package utils
import (
"strings"
)
func PathToS3Key(path string) string {
key := strings.TrimPrefix(path, "/")
if key == "" {
key = "root"
}
return key
}