package workflow import ( "bytes" "context" "errors" "fmt" "io" "net/url" "path/filepath" "strings" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" ) // S3FileAdapter implements FileAdapter using AWS S3 as storage type S3FileAdapter struct { client *s3.Client bucket string prefix string // Optional prefix for all keys (e.g., "workflows/") } // S3Config holds configuration for S3 file adapter type S3Config struct { Bucket string // S3 bucket name (required) Prefix string // Optional key prefix Region string // AWS region (default: us-east-1) Endpoint string // Custom endpoint (for S3-compatible services like MinIO) AccessKeyID string // Optional explicit credentials SecretAccessKey string // Optional explicit credentials UsePathStyle bool // Use path-style addressing (for MinIO/LocalStack) } // NewS3FileAdapter creates a new S3 file adapter func NewS3FileAdapter(ctx context.Context, cfg S3Config) (*S3FileAdapter, error) { if cfg.Bucket == "" { return nil, fmt.Errorf("S3 bucket name is required") } region := cfg.Region if region == "" { region = "us-east-1" } // Build AWS config options var opts []func(*config.LoadOptions) error opts = append(opts, config.WithRegion(region)) // Use explicit credentials if provided if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" { opts = append(opts, config.WithCredentialsProvider( credentials.NewStaticCredentialsProvider(cfg.AccessKeyID, cfg.SecretAccessKey, ""), )) } // Load AWS config awsCfg, err := config.LoadDefaultConfig(ctx, opts...) if err != nil { return nil, fmt.Errorf("failed to load AWS config: %w", err) } // Build S3 client options var s3Opts []func(*s3.Options) // Custom endpoint for S3-compatible services if cfg.Endpoint != "" { s3Opts = append(s3Opts, func(o *s3.Options) { o.BaseEndpoint = aws.String(cfg.Endpoint) }) } // Path-style addressing for MinIO/LocalStack if cfg.UsePathStyle { s3Opts = append(s3Opts, func(o *s3.Options) { o.UsePathStyle = true }) } client := s3.NewFromConfig(awsCfg, s3Opts...) prefix := cfg.Prefix if prefix != "" && !strings.HasSuffix(prefix, "/") { prefix += "/" } return &S3FileAdapter{ client: client, bucket: cfg.Bucket, prefix: prefix, }, nil } // fullKey returns the full S3 key for a given path func (a *S3FileAdapter) fullKey(path string) string { // Remove leading slash if present path = strings.TrimPrefix(path, "/") return a.prefix + path } // Read implements FileAdapter - reads a file from S3 func (a *S3FileAdapter) Read(ctx context.Context, path string) ([]byte, error) { key := a.fullKey(path) output, err := a.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(a.bucket), Key: aws.String(key), }) if err != nil { // Check if it's a not found error var nsk *types.NoSuchKey if ok := isErrorType(err, &nsk); ok { return nil, &FileNotFoundError{Path: path} } return nil, fmt.Errorf("failed to read from S3: %w", err) } defer output.Body.Close() content, err := io.ReadAll(output.Body) if err != nil { return nil, fmt.Errorf("failed to read S3 object body: %w", err) } return content, nil } // Write implements FileAdapter - writes a file to S3 func (a *S3FileAdapter) Write(ctx context.Context, path string, content []byte, mode WriteMode) error { key := a.fullKey(path) switch mode { case WriteModeFailIfExists: // Check if object exists first exists, err := a.Exists(ctx, path) if err != nil { return err } if exists { return &FileExistsError{Path: path} } // Fall through to write case WriteModeAppend: // S3 doesn't support append, so we need to read, append, and write existing, err := a.Read(ctx, path) if err != nil { // If file doesn't exist, that's fine - just write the new content if _, ok := err.(*FileNotFoundError); !ok { return err } } content = append(existing, content...) case WriteModeOverwrite: // Default behavior - just write } _, err := a.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(a.bucket), Key: aws.String(key), Body: bytes.NewReader(content), }) if err != nil { return fmt.Errorf("failed to write to S3: %w", err) } return nil } // Exists implements FileAdapter - checks if a file exists in S3 func (a *S3FileAdapter) Exists(ctx context.Context, path string) (bool, error) { key := a.fullKey(path) _, err := a.client.HeadObject(ctx, &s3.HeadObjectInput{ Bucket: aws.String(a.bucket), Key: aws.String(key), }) if err != nil { // Check for not found var nf *types.NotFound if ok := isErrorType(err, &nf); ok { return false, nil } // Also check for NoSuchKey var nsk *types.NoSuchKey if ok := isErrorType(err, &nsk); ok { return false, nil } // For HeadObject, a 404 comes as a different error if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "404") { return false, nil } return false, fmt.Errorf("failed to check S3 object existence: %w", err) } return true, nil } // List implements FileAdapter - lists files matching a pattern in S3 func (a *S3FileAdapter) List(ctx context.Context, pattern string) ([]string, error) { // Convert glob pattern to S3 prefix // For patterns like "Process/Artifacts/*", use "Process/Artifacts/" as prefix prefix := a.fullKey(extractPrefix(pattern)) var matches []string paginator := s3.NewListObjectsV2Paginator(a.client, &s3.ListObjectsV2Input{ Bucket: aws.String(a.bucket), Prefix: aws.String(prefix), }) for paginator.HasMorePages() { page, err := paginator.NextPage(ctx) if err != nil { return nil, fmt.Errorf("failed to list S3 objects: %w", err) } for _, obj := range page.Contents { // Remove our prefix to get the original path path := strings.TrimPrefix(*obj.Key, a.prefix) // Check if path matches the pattern if matchPattern(path, pattern) { matches = append(matches, path) } } } return matches, nil } // Delete deletes a file from S3 func (a *S3FileAdapter) Delete(ctx context.Context, path string) error { key := a.fullKey(path) _, err := a.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(a.bucket), Key: aws.String(key), }) if err != nil { return fmt.Errorf("failed to delete S3 object: %w", err) } return nil } // Copy copies a file within S3 func (a *S3FileAdapter) Copy(ctx context.Context, src, dst string) error { srcKey := a.fullKey(src) dstKey := a.fullKey(dst) // CopySource must be URL-encoded for keys with special characters copySource := url.PathEscape(a.bucket + "/" + srcKey) _, err := a.client.CopyObject(ctx, &s3.CopyObjectInput{ Bucket: aws.String(a.bucket), CopySource: aws.String(copySource), Key: aws.String(dstKey), }) if err != nil { return fmt.Errorf("failed to copy S3 object: %w", err) } return nil } // extractPrefix extracts the literal prefix from a glob pattern // e.g., "Process/Artifacts/*.json" -> "Process/Artifacts/" func extractPrefix(pattern string) string { // Find the first wildcard character idx := strings.IndexAny(pattern, "*?[") if idx == -1 { return pattern } // Return everything up to and including the last slash before the wildcard prefix := pattern[:idx] lastSlash := strings.LastIndex(prefix, "/") if lastSlash != -1 { return prefix[:lastSlash+1] } return "" } // isErrorType checks if an error is of a specific type using errors.As func isErrorType[T error](err error, target *T) bool { if err == nil { return false } return errors.As(err, target) } // GetPresignedURL generates a presigned URL for downloading a file func (a *S3FileAdapter) GetPresignedURL(ctx context.Context, path string, expirySeconds int64) (string, error) { key := a.fullKey(path) presignClient := s3.NewPresignClient(a.client) request, err := presignClient.PresignGetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(a.bucket), Key: aws.String(key), }, func(opts *s3.PresignOptions) { opts.Expires = time.Duration(expirySeconds) * time.Second }) if err != nil { return "", fmt.Errorf("failed to generate presigned URL: %w", err) } return request.URL, nil } // GetUploadPresignedURL generates a presigned URL for uploading a file func (a *S3FileAdapter) GetUploadPresignedURL(ctx context.Context, path string, expirySeconds int64) (string, error) { key := a.fullKey(path) presignClient := s3.NewPresignClient(a.client) request, err := presignClient.PresignPutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(a.bucket), Key: aws.String(key), }, func(opts *s3.PresignOptions) { opts.Expires = time.Duration(expirySeconds) * time.Second }) if err != nil { return "", fmt.Errorf("failed to generate upload presigned URL: %w", err) } return request.URL, nil } // GetObjectMetadata returns metadata for an S3 object func (a *S3FileAdapter) GetObjectMetadata(ctx context.Context, path string) (map[string]string, error) { key := a.fullKey(path) output, err := a.client.HeadObject(ctx, &s3.HeadObjectInput{ Bucket: aws.String(a.bucket), Key: aws.String(key), }) if err != nil { return nil, fmt.Errorf("failed to get object metadata: %w", err) } metadata := make(map[string]string) for k, v := range output.Metadata { metadata[k] = v } // Add standard metadata if output.ContentType != nil { metadata["content-type"] = *output.ContentType } if output.ContentLength != nil { metadata["content-length"] = fmt.Sprintf("%d", *output.ContentLength) } if output.LastModified != nil { metadata["last-modified"] = output.LastModified.String() } if output.ETag != nil { metadata["etag"] = *output.ETag } return metadata, nil } // WriteWithMetadata writes a file to S3 with custom metadata func (a *S3FileAdapter) WriteWithMetadata(ctx context.Context, path string, content []byte, contentType string, metadata map[string]string) error { key := a.fullKey(path) input := &s3.PutObjectInput{ Bucket: aws.String(a.bucket), Key: aws.String(key), Body: bytes.NewReader(content), Metadata: metadata, } if contentType != "" { input.ContentType = aws.String(contentType) } else { // Infer content type from extension input.ContentType = aws.String(inferContentType(path)) } _, err := a.client.PutObject(ctx, input) if err != nil { return fmt.Errorf("failed to write to S3: %w", err) } return nil } // inferContentType infers content type from file extension func inferContentType(path string) string { ext := strings.ToLower(filepath.Ext(path)) switch ext { case ".json": return "application/json" case ".txt": return "text/plain" case ".html": return "text/html" case ".css": return "text/css" case ".js": return "application/javascript" case ".ts": return "application/typescript" case ".tsx": return "application/typescript" case ".md": return "text/markdown" case ".xml": return "application/xml" case ".yaml", ".yml": return "application/yaml" case ".png": return "image/png" case ".jpg", ".jpeg": return "image/jpeg" case ".gif": return "image/gif" case ".svg": return "image/svg+xml" case ".pdf": return "application/pdf" default: return "application/octet-stream" } }