package storage import ( "bytes" "fmt" "io" "robotfs/config" "robotfs/utils" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" ) type Storage struct { s3Client *s3.S3 s3Uploader *s3manager.Uploader s3Downloader *s3manager.Downloader bucketName string } func NewStorage() *Storage { return &Storage{} } func (s *Storage) Initialize(cfg config.Configuration, prefix string) error { s3Config, err := initialize(cfg, prefix) if err != nil { return err } s3Config, awsSession, err := createS3Session(s3Config) if err != nil { return err } s3Client := s3.New(awsSession) uploader := s3manager.NewUploader(awsSession, func(u *s3manager.Uploader) { u.PartSize = 100 * 1024 * 1024 u.Concurrency = 5 u.LeavePartsOnError = false u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(25 * 1024 * 1024) }) downloader := s3manager.NewDownloader(awsSession) s.s3Client = s3Client s.s3Uploader = uploader s.s3Downloader = downloader s.bucketName = s3Config.BucketName return nil } type S3Config struct { Region string Endpoint string AccessKeyID string SecretAccessKey string BucketName string UsePathStyle bool } func initialize(cfg config.Configuration, prefix string) (*S3Config, error) { s3Cfg := &S3Config{ Region: cfg.GetString(prefix + "region"), Endpoint: cfg.GetString(prefix + "endpoint"), AccessKeyID: cfg.GetString(prefix + "access_key_id"), SecretAccessKey: cfg.GetString(prefix + "secret_access_key"), BucketName: cfg.GetString(prefix + "bucket_name"), UsePathStyle: cfg.GetBool(prefix + "use_path_style"), } if s3Cfg.BucketName == "" { return nil, fmt.Errorf("bucket_name is required") } return s3Cfg, nil } func createS3Session(config *S3Config) (*S3Config, *session.Session, error) { awsConfig := &aws.Config{ Credentials: credentials.NewStaticCredentials(config.AccessKeyID, config.SecretAccessKey, ""), Region: aws.String(config.Region), S3ForcePathStyle: aws.Bool(config.UsePathStyle), } if config.Endpoint != "" { awsConfig.Endpoint = aws.String(config.Endpoint) } sess, err := session.NewSession(awsConfig) if err != nil { return config, nil, err } return config, sess, nil } func (s *Storage) UploadData(s3Key string, data []byte, contentType string) (*s3manager.UploadOutput, error) { if contentType == "" { contentType = "application/octet-stream" } output, err := s.s3Uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String(s.bucketName), Key: aws.String(s3Key), Body: bytes.NewReader(data), ContentType: aws.String(contentType), }) if err != nil { return nil, err } return output, nil } func (s *Storage) UploadFile(s3Key string, data io.Reader, contentType string) (*s3manager.UploadOutput, error) { if contentType == "" { contentType = "application/octet-stream" } output, err := s.s3Uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String(s.bucketName), Key: aws.String(s3Key), Body: data, ContentType: aws.String(contentType), }) if err != nil { return nil, err } return output, nil } func (s *Storage) DownloadFile(S3Key string) (*utils.S3ReadSeeker, error) { head, err := s.s3Client.HeadObject(&s3.HeadObjectInput{ Bucket: aws.String(s.bucketName), Key: aws.String(S3Key), }) if err != nil { return nil, err } result, err := s.s3Client.GetObject(&s3.GetObjectInput{ Bucket: aws.String(s.bucketName), Key: aws.String(S3Key), }) if err != nil { return nil, err } return utils.NewS3ReadSeeker(s.s3Client, s.bucketName, S3Key, result.Body, *head.ContentLength), nil } func (s *Storage) ReadObject(S3Key string) ([]byte, error) { buf := aws.NewWriteAtBuffer([]byte{}) _, err := s.s3Downloader.Download(buf, &s3.GetObjectInput{ Bucket: aws.String(s.bucketName), Key: aws.String(S3Key), }) if err != nil { return nil, err } return buf.Bytes(), nil } func (s *Storage) DeleteObject(s3Key string) error { _, err := s.s3Client.DeleteObject(&s3.DeleteObjectInput{ Bucket: aws.String(s.bucketName), Key: aws.String(s3Key), }) if err != nil { return err } return s.s3Client.WaitUntilObjectNotExists(&s3.HeadObjectInput{ Bucket: aws.String(s.bucketName), Key: aws.String(s3Key), }) } func (s *Storage) CopyObject(oldS3key, newS3Key string) error { _, err := s.s3Client.CopyObject(&s3.CopyObjectInput{ Bucket: aws.String(s.bucketName), CopySource: aws.String(fmt.Sprintf("%s/%s", s.bucketName, oldS3key)), Key: aws.String(newS3Key), }) if err != nil { return err } return s.s3Client.WaitUntilObjectExists(&s3.HeadObjectInput{ Bucket: aws.String(s.bucketName), Key: aws.String(newS3Key), }) }