robotfs/engine/storage_manager.go

209 lines
5.1 KiB
Go

package engine
import (
"bytes"
"fmt"
"io"
"robotfs/utils"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
type S3Config struct {
Region string
Endpoint string
AccessKeyID string
SecretAccessKey string
BucketName string
UsePathStyle bool
}
func Initialize(configuration utils.Configuration, prefix string) (*S3Config, error) {
s3Cfg := &S3Config{
Region: configuration.GetString(prefix + "region"),
Endpoint: configuration.GetString(prefix + "endpoint"),
AccessKeyID: configuration.GetString(prefix + "access_key_id"),
SecretAccessKey: configuration.GetString(prefix + "secret_access_key"),
BucketName: configuration.GetString(prefix + "bucket_name"),
UsePathStyle: configuration.GetBool(prefix + "use_path_style"),
}
if s3Cfg.BucketName == "" {
return nil, fmt.Errorf("bucket_name is required")
}
return s3Cfg, nil
}
type StorageManager struct {
s3Client *s3.S3
s3Uploader *s3manager.Uploader
s3Downloader *s3manager.Downloader
bucketName string
}
func NewStorageManager(s3Config *S3Config) (*StorageManager, error) {
s3Config, awsSession, err := createS3Session(s3Config)
if err != nil {
return nil, err
}
s3Client := s3.New(awsSession)
uploader := s3manager.NewUploader(awsSession)
downloader := s3manager.NewDownloader(awsSession)
return &StorageManager{
s3Client: s3Client,
s3Uploader: uploader,
s3Downloader: downloader,
bucketName: s3Config.BucketName,
}, nil
}
func createS3Session(config *S3Config) (*S3Config, *session.Session, error) {
awsConfig := &aws.Config{
Credentials: credentials.NewStaticCredentials(config.AccessKeyID, config.SecretAccessKey, ""),
Region: aws.String(config.Region),
S3ForcePathStyle: aws.Bool(config.UsePathStyle),
}
if config.Endpoint != "" {
awsConfig.Endpoint = aws.String(config.Endpoint)
}
sess, err := session.NewSession(awsConfig)
if err != nil {
return config, nil, err
}
return config, sess, nil
}
func (sm *StorageManager) UploadData(s3Key string, data []byte, contentType string) (*s3manager.UploadOutput, error) {
if contentType == "" {
contentType = "application/octet-stream"
}
output, err := sm.s3Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(s3Key),
Body: bytes.NewReader(data),
ContentType: aws.String(contentType),
})
if err != nil {
return nil, err
}
return output, nil
}
func (sm *StorageManager) UploadFile(s3Key string, data io.Reader, contentType string) (*s3manager.UploadOutput, error) {
if contentType == "" {
contentType = "application/octet-stream"
}
output, err := sm.s3Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(s3Key),
Body: data,
ContentType: aws.String(contentType),
})
if err != nil {
return nil, err
}
return output, nil
}
func (sm *StorageManager) DownloadFile(S3Key string) (io.ReadSeeker, error) {
head, err := sm.s3Client.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(S3Key),
})
if err != nil {
return nil, err
}
result, err := sm.s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(S3Key),
})
if err != nil {
return nil, err
}
return utils.NewS3ReadSeeker(sm.s3Client, sm.bucketName, S3Key, result.Body, *head.ContentLength), nil
}
func (sm *StorageManager) ReadObject(S3Key string) ([]byte, error) {
buf := aws.NewWriteAtBuffer([]byte{})
_, err := sm.s3Downloader.Download(buf, &s3.GetObjectInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(S3Key),
})
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (sm *StorageManager) DeleteObject(s3Key string) error {
_, err := sm.s3Client.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(s3Key),
})
if err != nil {
return err
}
return sm.s3Client.WaitUntilObjectNotExists(&s3.HeadObjectInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(s3Key),
})
}
func (sm *StorageManager) CopyObject(oldS3key, newS3Key string) error {
_, err := sm.s3Client.CopyObject(&s3.CopyObjectInput{
Bucket: aws.String(sm.bucketName),
CopySource: aws.String(fmt.Sprintf("%s/%s", sm.bucketName, oldS3key)),
Key: aws.String(newS3Key),
})
if err != nil {
return err
}
return sm.s3Client.WaitUntilObjectExists(&s3.HeadObjectInput{
Bucket: aws.String(sm.bucketName),
Key: aws.String(newS3Key),
})
}
func (sm *StorageManager) ListObjects(prefix string) ([]string, error) {
var result []string
input := &s3.ListObjectsV2Input{
Bucket: aws.String(sm.bucketName),
Prefix: aws.String(prefix),
}
err := sm.s3Client.ListObjectsV2Pages(input, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
for _, obj := range page.Contents {
result = append(result, *obj.Key)
}
return !lastPage
})
if err != nil {
return nil, err
}
return result, nil
}