| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 |
- package workflow
- import (
- "bytes"
- "context"
- "errors"
- "fmt"
- "io"
- "net/url"
- "path/filepath"
- "strings"
- "time"
- "github.com/aws/aws-sdk-go-v2/aws"
- "github.com/aws/aws-sdk-go-v2/config"
- "github.com/aws/aws-sdk-go-v2/credentials"
- "github.com/aws/aws-sdk-go-v2/service/s3"
- "github.com/aws/aws-sdk-go-v2/service/s3/types"
- )
- // S3FileAdapter implements FileAdapter using AWS S3 as storage
- type S3FileAdapter struct {
- client *s3.Client
- bucket string
- prefix string // Optional prefix for all keys (e.g., "workflows/")
- }
- // S3Config holds configuration for S3 file adapter
- type S3Config struct {
- Bucket string // S3 bucket name (required)
- Prefix string // Optional key prefix
- Region string // AWS region (default: us-east-1)
- Endpoint string // Custom endpoint (for S3-compatible services like MinIO)
- AccessKeyID string // Optional explicit credentials
- SecretAccessKey string // Optional explicit credentials
- UsePathStyle bool // Use path-style addressing (for MinIO/LocalStack)
- }
- // NewS3FileAdapter creates a new S3 file adapter
- func NewS3FileAdapter(ctx context.Context, cfg S3Config) (*S3FileAdapter, error) {
- if cfg.Bucket == "" {
- return nil, fmt.Errorf("S3 bucket name is required")
- }
- region := cfg.Region
- if region == "" {
- region = "us-east-1"
- }
- // Build AWS config options
- var opts []func(*config.LoadOptions) error
- opts = append(opts, config.WithRegion(region))
- // Use explicit credentials if provided
- if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" {
- opts = append(opts, config.WithCredentialsProvider(
- credentials.NewStaticCredentialsProvider(cfg.AccessKeyID, cfg.SecretAccessKey, ""),
- ))
- }
- // Load AWS config
- awsCfg, err := config.LoadDefaultConfig(ctx, opts...)
- if err != nil {
- return nil, fmt.Errorf("failed to load AWS config: %w", err)
- }
- // Build S3 client options
- var s3Opts []func(*s3.Options)
- // Custom endpoint for S3-compatible services
- if cfg.Endpoint != "" {
- s3Opts = append(s3Opts, func(o *s3.Options) {
- o.BaseEndpoint = aws.String(cfg.Endpoint)
- })
- }
- // Path-style addressing for MinIO/LocalStack
- if cfg.UsePathStyle {
- s3Opts = append(s3Opts, func(o *s3.Options) {
- o.UsePathStyle = true
- })
- }
- client := s3.NewFromConfig(awsCfg, s3Opts...)
- prefix := cfg.Prefix
- if prefix != "" && !strings.HasSuffix(prefix, "/") {
- prefix += "/"
- }
- return &S3FileAdapter{
- client: client,
- bucket: cfg.Bucket,
- prefix: prefix,
- }, nil
- }
- // fullKey returns the full S3 key for a given path
- func (a *S3FileAdapter) fullKey(path string) string {
- // Remove leading slash if present
- path = strings.TrimPrefix(path, "/")
- return a.prefix + path
- }
- // Read implements FileAdapter - reads a file from S3
- func (a *S3FileAdapter) Read(ctx context.Context, path string) ([]byte, error) {
- key := a.fullKey(path)
- output, err := a.client.GetObject(ctx, &s3.GetObjectInput{
- Bucket: aws.String(a.bucket),
- Key: aws.String(key),
- })
- if err != nil {
- // Check if it's a not found error
- var nsk *types.NoSuchKey
- if ok := isErrorType(err, &nsk); ok {
- return nil, &FileNotFoundError{Path: path}
- }
- return nil, fmt.Errorf("failed to read from S3: %w", err)
- }
- defer output.Body.Close()
- content, err := io.ReadAll(output.Body)
- if err != nil {
- return nil, fmt.Errorf("failed to read S3 object body: %w", err)
- }
- return content, nil
- }
- // Write implements FileAdapter - writes a file to S3
- func (a *S3FileAdapter) Write(ctx context.Context, path string, content []byte, mode WriteMode) error {
- key := a.fullKey(path)
- switch mode {
- case WriteModeFailIfExists:
- // Check if object exists first
- exists, err := a.Exists(ctx, path)
- if err != nil {
- return err
- }
- if exists {
- return &FileExistsError{Path: path}
- }
- // Fall through to write
- case WriteModeAppend:
- // S3 doesn't support append, so we need to read, append, and write
- existing, err := a.Read(ctx, path)
- if err != nil {
- // If file doesn't exist, that's fine - just write the new content
- if _, ok := err.(*FileNotFoundError); !ok {
- return err
- }
- }
- content = append(existing, content...)
- case WriteModeOverwrite:
- // Default behavior - just write
- }
- _, err := a.client.PutObject(ctx, &s3.PutObjectInput{
- Bucket: aws.String(a.bucket),
- Key: aws.String(key),
- Body: bytes.NewReader(content),
- })
- if err != nil {
- return fmt.Errorf("failed to write to S3: %w", err)
- }
- return nil
- }
- // Exists implements FileAdapter - checks if a file exists in S3
- func (a *S3FileAdapter) Exists(ctx context.Context, path string) (bool, error) {
- key := a.fullKey(path)
- _, err := a.client.HeadObject(ctx, &s3.HeadObjectInput{
- Bucket: aws.String(a.bucket),
- Key: aws.String(key),
- })
- if err != nil {
- // Check for not found
- var nf *types.NotFound
- if ok := isErrorType(err, &nf); ok {
- return false, nil
- }
- // Also check for NoSuchKey
- var nsk *types.NoSuchKey
- if ok := isErrorType(err, &nsk); ok {
- return false, nil
- }
- // For HeadObject, a 404 comes as a different error
- if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "404") {
- return false, nil
- }
- return false, fmt.Errorf("failed to check S3 object existence: %w", err)
- }
- return true, nil
- }
- // List implements FileAdapter - lists files matching a pattern in S3
- func (a *S3FileAdapter) List(ctx context.Context, pattern string) ([]string, error) {
- // Convert glob pattern to S3 prefix
- // For patterns like "Process/Artifacts/*", use "Process/Artifacts/" as prefix
- prefix := a.fullKey(extractPrefix(pattern))
- var matches []string
- paginator := s3.NewListObjectsV2Paginator(a.client, &s3.ListObjectsV2Input{
- Bucket: aws.String(a.bucket),
- Prefix: aws.String(prefix),
- })
- for paginator.HasMorePages() {
- page, err := paginator.NextPage(ctx)
- if err != nil {
- return nil, fmt.Errorf("failed to list S3 objects: %w", err)
- }
- for _, obj := range page.Contents {
- // Remove our prefix to get the original path
- path := strings.TrimPrefix(*obj.Key, a.prefix)
- // Check if path matches the pattern
- if matchPattern(path, pattern) {
- matches = append(matches, path)
- }
- }
- }
- return matches, nil
- }
- // Delete deletes a file from S3
- func (a *S3FileAdapter) Delete(ctx context.Context, path string) error {
- key := a.fullKey(path)
- _, err := a.client.DeleteObject(ctx, &s3.DeleteObjectInput{
- Bucket: aws.String(a.bucket),
- Key: aws.String(key),
- })
- if err != nil {
- return fmt.Errorf("failed to delete S3 object: %w", err)
- }
- return nil
- }
- // Copy copies a file within S3
- func (a *S3FileAdapter) Copy(ctx context.Context, src, dst string) error {
- srcKey := a.fullKey(src)
- dstKey := a.fullKey(dst)
- // CopySource must be URL-encoded for keys with special characters
- copySource := url.PathEscape(a.bucket + "/" + srcKey)
- _, err := a.client.CopyObject(ctx, &s3.CopyObjectInput{
- Bucket: aws.String(a.bucket),
- CopySource: aws.String(copySource),
- Key: aws.String(dstKey),
- })
- if err != nil {
- return fmt.Errorf("failed to copy S3 object: %w", err)
- }
- return nil
- }
- // extractPrefix extracts the literal prefix from a glob pattern
- // e.g., "Process/Artifacts/*.json" -> "Process/Artifacts/"
- func extractPrefix(pattern string) string {
- // Find the first wildcard character
- idx := strings.IndexAny(pattern, "*?[")
- if idx == -1 {
- return pattern
- }
- // Return everything up to and including the last slash before the wildcard
- prefix := pattern[:idx]
- lastSlash := strings.LastIndex(prefix, "/")
- if lastSlash != -1 {
- return prefix[:lastSlash+1]
- }
- return ""
- }
- // isErrorType checks if an error is of a specific type using errors.As
- func isErrorType[T error](err error, target *T) bool {
- if err == nil {
- return false
- }
- return errors.As(err, target)
- }
- // GetPresignedURL generates a presigned URL for downloading a file
- func (a *S3FileAdapter) GetPresignedURL(ctx context.Context, path string, expirySeconds int64) (string, error) {
- key := a.fullKey(path)
- presignClient := s3.NewPresignClient(a.client)
- request, err := presignClient.PresignGetObject(ctx, &s3.GetObjectInput{
- Bucket: aws.String(a.bucket),
- Key: aws.String(key),
- }, func(opts *s3.PresignOptions) {
- opts.Expires = time.Duration(expirySeconds) * time.Second
- })
- if err != nil {
- return "", fmt.Errorf("failed to generate presigned URL: %w", err)
- }
- return request.URL, nil
- }
- // GetUploadPresignedURL generates a presigned URL for uploading a file
- func (a *S3FileAdapter) GetUploadPresignedURL(ctx context.Context, path string, expirySeconds int64) (string, error) {
- key := a.fullKey(path)
- presignClient := s3.NewPresignClient(a.client)
- request, err := presignClient.PresignPutObject(ctx, &s3.PutObjectInput{
- Bucket: aws.String(a.bucket),
- Key: aws.String(key),
- }, func(opts *s3.PresignOptions) {
- opts.Expires = time.Duration(expirySeconds) * time.Second
- })
- if err != nil {
- return "", fmt.Errorf("failed to generate upload presigned URL: %w", err)
- }
- return request.URL, nil
- }
- // GetObjectMetadata returns metadata for an S3 object
- func (a *S3FileAdapter) GetObjectMetadata(ctx context.Context, path string) (map[string]string, error) {
- key := a.fullKey(path)
- output, err := a.client.HeadObject(ctx, &s3.HeadObjectInput{
- Bucket: aws.String(a.bucket),
- Key: aws.String(key),
- })
- if err != nil {
- return nil, fmt.Errorf("failed to get object metadata: %w", err)
- }
- metadata := make(map[string]string)
- for k, v := range output.Metadata {
- metadata[k] = v
- }
- // Add standard metadata
- if output.ContentType != nil {
- metadata["content-type"] = *output.ContentType
- }
- if output.ContentLength != nil {
- metadata["content-length"] = fmt.Sprintf("%d", *output.ContentLength)
- }
- if output.LastModified != nil {
- metadata["last-modified"] = output.LastModified.String()
- }
- if output.ETag != nil {
- metadata["etag"] = *output.ETag
- }
- return metadata, nil
- }
- // WriteWithMetadata writes a file to S3 with custom metadata
- func (a *S3FileAdapter) WriteWithMetadata(ctx context.Context, path string, content []byte, contentType string, metadata map[string]string) error {
- key := a.fullKey(path)
- input := &s3.PutObjectInput{
- Bucket: aws.String(a.bucket),
- Key: aws.String(key),
- Body: bytes.NewReader(content),
- Metadata: metadata,
- }
- if contentType != "" {
- input.ContentType = aws.String(contentType)
- } else {
- // Infer content type from extension
- input.ContentType = aws.String(inferContentType(path))
- }
- _, err := a.client.PutObject(ctx, input)
- if err != nil {
- return fmt.Errorf("failed to write to S3: %w", err)
- }
- return nil
- }
- // inferContentType infers content type from file extension
- func inferContentType(path string) string {
- ext := strings.ToLower(filepath.Ext(path))
- switch ext {
- case ".json":
- return "application/json"
- case ".txt":
- return "text/plain"
- case ".html":
- return "text/html"
- case ".css":
- return "text/css"
- case ".js":
- return "application/javascript"
- case ".ts":
- return "application/typescript"
- case ".tsx":
- return "application/typescript"
- case ".md":
- return "text/markdown"
- case ".xml":
- return "application/xml"
- case ".yaml", ".yml":
- return "application/yaml"
- case ".png":
- return "image/png"
- case ".jpg", ".jpeg":
- return "image/jpeg"
- case ".gif":
- return "image/gif"
- case ".svg":
- return "image/svg+xml"
- case ".pdf":
- return "application/pdf"
- default:
- return "application/octet-stream"
- }
- }
|