s3_adapter.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. package workflow
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/url"
  9. "path/filepath"
  10. "strings"
  11. "time"
  12. "github.com/aws/aws-sdk-go-v2/aws"
  13. "github.com/aws/aws-sdk-go-v2/config"
  14. "github.com/aws/aws-sdk-go-v2/credentials"
  15. "github.com/aws/aws-sdk-go-v2/service/s3"
  16. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  17. )
  18. // S3FileAdapter implements FileAdapter using AWS S3 as storage
  19. type S3FileAdapter struct {
  20. client *s3.Client
  21. bucket string
  22. prefix string // Optional prefix for all keys (e.g., "workflows/")
  23. }
  24. // S3Config holds configuration for S3 file adapter
  25. type S3Config struct {
  26. Bucket string // S3 bucket name (required)
  27. Prefix string // Optional key prefix
  28. Region string // AWS region (default: us-east-1)
  29. Endpoint string // Custom endpoint (for S3-compatible services like MinIO)
  30. AccessKeyID string // Optional explicit credentials
  31. SecretAccessKey string // Optional explicit credentials
  32. UsePathStyle bool // Use path-style addressing (for MinIO/LocalStack)
  33. }
  34. // NewS3FileAdapter creates a new S3 file adapter
  35. func NewS3FileAdapter(ctx context.Context, cfg S3Config) (*S3FileAdapter, error) {
  36. if cfg.Bucket == "" {
  37. return nil, fmt.Errorf("S3 bucket name is required")
  38. }
  39. region := cfg.Region
  40. if region == "" {
  41. region = "us-east-1"
  42. }
  43. // Build AWS config options
  44. var opts []func(*config.LoadOptions) error
  45. opts = append(opts, config.WithRegion(region))
  46. // Use explicit credentials if provided
  47. if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" {
  48. opts = append(opts, config.WithCredentialsProvider(
  49. credentials.NewStaticCredentialsProvider(cfg.AccessKeyID, cfg.SecretAccessKey, ""),
  50. ))
  51. }
  52. // Load AWS config
  53. awsCfg, err := config.LoadDefaultConfig(ctx, opts...)
  54. if err != nil {
  55. return nil, fmt.Errorf("failed to load AWS config: %w", err)
  56. }
  57. // Build S3 client options
  58. var s3Opts []func(*s3.Options)
  59. // Custom endpoint for S3-compatible services
  60. if cfg.Endpoint != "" {
  61. s3Opts = append(s3Opts, func(o *s3.Options) {
  62. o.BaseEndpoint = aws.String(cfg.Endpoint)
  63. })
  64. }
  65. // Path-style addressing for MinIO/LocalStack
  66. if cfg.UsePathStyle {
  67. s3Opts = append(s3Opts, func(o *s3.Options) {
  68. o.UsePathStyle = true
  69. })
  70. }
  71. client := s3.NewFromConfig(awsCfg, s3Opts...)
  72. prefix := cfg.Prefix
  73. if prefix != "" && !strings.HasSuffix(prefix, "/") {
  74. prefix += "/"
  75. }
  76. return &S3FileAdapter{
  77. client: client,
  78. bucket: cfg.Bucket,
  79. prefix: prefix,
  80. }, nil
  81. }
  82. // fullKey returns the full S3 key for a given path
  83. func (a *S3FileAdapter) fullKey(path string) string {
  84. // Remove leading slash if present
  85. path = strings.TrimPrefix(path, "/")
  86. return a.prefix + path
  87. }
  88. // Read implements FileAdapter - reads a file from S3
  89. func (a *S3FileAdapter) Read(ctx context.Context, path string) ([]byte, error) {
  90. key := a.fullKey(path)
  91. output, err := a.client.GetObject(ctx, &s3.GetObjectInput{
  92. Bucket: aws.String(a.bucket),
  93. Key: aws.String(key),
  94. })
  95. if err != nil {
  96. // Check if it's a not found error
  97. var nsk *types.NoSuchKey
  98. if ok := isErrorType(err, &nsk); ok {
  99. return nil, &FileNotFoundError{Path: path}
  100. }
  101. return nil, fmt.Errorf("failed to read from S3: %w", err)
  102. }
  103. defer output.Body.Close()
  104. content, err := io.ReadAll(output.Body)
  105. if err != nil {
  106. return nil, fmt.Errorf("failed to read S3 object body: %w", err)
  107. }
  108. return content, nil
  109. }
  110. // Write implements FileAdapter - writes a file to S3
  111. func (a *S3FileAdapter) Write(ctx context.Context, path string, content []byte, mode WriteMode) error {
  112. key := a.fullKey(path)
  113. switch mode {
  114. case WriteModeFailIfExists:
  115. // Check if object exists first
  116. exists, err := a.Exists(ctx, path)
  117. if err != nil {
  118. return err
  119. }
  120. if exists {
  121. return &FileExistsError{Path: path}
  122. }
  123. // Fall through to write
  124. case WriteModeAppend:
  125. // S3 doesn't support append, so we need to read, append, and write
  126. existing, err := a.Read(ctx, path)
  127. if err != nil {
  128. // If file doesn't exist, that's fine - just write the new content
  129. if _, ok := err.(*FileNotFoundError); !ok {
  130. return err
  131. }
  132. }
  133. content = append(existing, content...)
  134. case WriteModeOverwrite:
  135. // Default behavior - just write
  136. }
  137. _, err := a.client.PutObject(ctx, &s3.PutObjectInput{
  138. Bucket: aws.String(a.bucket),
  139. Key: aws.String(key),
  140. Body: bytes.NewReader(content),
  141. })
  142. if err != nil {
  143. return fmt.Errorf("failed to write to S3: %w", err)
  144. }
  145. return nil
  146. }
  147. // Exists implements FileAdapter - checks if a file exists in S3
  148. func (a *S3FileAdapter) Exists(ctx context.Context, path string) (bool, error) {
  149. key := a.fullKey(path)
  150. _, err := a.client.HeadObject(ctx, &s3.HeadObjectInput{
  151. Bucket: aws.String(a.bucket),
  152. Key: aws.String(key),
  153. })
  154. if err != nil {
  155. // Check for not found
  156. var nf *types.NotFound
  157. if ok := isErrorType(err, &nf); ok {
  158. return false, nil
  159. }
  160. // Also check for NoSuchKey
  161. var nsk *types.NoSuchKey
  162. if ok := isErrorType(err, &nsk); ok {
  163. return false, nil
  164. }
  165. // For HeadObject, a 404 comes as a different error
  166. if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "404") {
  167. return false, nil
  168. }
  169. return false, fmt.Errorf("failed to check S3 object existence: %w", err)
  170. }
  171. return true, nil
  172. }
  173. // List implements FileAdapter - lists files matching a pattern in S3
  174. func (a *S3FileAdapter) List(ctx context.Context, pattern string) ([]string, error) {
  175. // Convert glob pattern to S3 prefix
  176. // For patterns like "Process/Artifacts/*", use "Process/Artifacts/" as prefix
  177. prefix := a.fullKey(extractPrefix(pattern))
  178. var matches []string
  179. paginator := s3.NewListObjectsV2Paginator(a.client, &s3.ListObjectsV2Input{
  180. Bucket: aws.String(a.bucket),
  181. Prefix: aws.String(prefix),
  182. })
  183. for paginator.HasMorePages() {
  184. page, err := paginator.NextPage(ctx)
  185. if err != nil {
  186. return nil, fmt.Errorf("failed to list S3 objects: %w", err)
  187. }
  188. for _, obj := range page.Contents {
  189. // Remove our prefix to get the original path
  190. path := strings.TrimPrefix(*obj.Key, a.prefix)
  191. // Check if path matches the pattern
  192. if matchPattern(path, pattern) {
  193. matches = append(matches, path)
  194. }
  195. }
  196. }
  197. return matches, nil
  198. }
  199. // Delete deletes a file from S3
  200. func (a *S3FileAdapter) Delete(ctx context.Context, path string) error {
  201. key := a.fullKey(path)
  202. _, err := a.client.DeleteObject(ctx, &s3.DeleteObjectInput{
  203. Bucket: aws.String(a.bucket),
  204. Key: aws.String(key),
  205. })
  206. if err != nil {
  207. return fmt.Errorf("failed to delete S3 object: %w", err)
  208. }
  209. return nil
  210. }
  211. // Copy copies a file within S3
  212. func (a *S3FileAdapter) Copy(ctx context.Context, src, dst string) error {
  213. srcKey := a.fullKey(src)
  214. dstKey := a.fullKey(dst)
  215. // CopySource must be URL-encoded for keys with special characters
  216. copySource := url.PathEscape(a.bucket + "/" + srcKey)
  217. _, err := a.client.CopyObject(ctx, &s3.CopyObjectInput{
  218. Bucket: aws.String(a.bucket),
  219. CopySource: aws.String(copySource),
  220. Key: aws.String(dstKey),
  221. })
  222. if err != nil {
  223. return fmt.Errorf("failed to copy S3 object: %w", err)
  224. }
  225. return nil
  226. }
  227. // extractPrefix extracts the literal prefix from a glob pattern
  228. // e.g., "Process/Artifacts/*.json" -> "Process/Artifacts/"
  229. func extractPrefix(pattern string) string {
  230. // Find the first wildcard character
  231. idx := strings.IndexAny(pattern, "*?[")
  232. if idx == -1 {
  233. return pattern
  234. }
  235. // Return everything up to and including the last slash before the wildcard
  236. prefix := pattern[:idx]
  237. lastSlash := strings.LastIndex(prefix, "/")
  238. if lastSlash != -1 {
  239. return prefix[:lastSlash+1]
  240. }
  241. return ""
  242. }
  243. // isErrorType checks if an error is of a specific type using errors.As
  244. func isErrorType[T error](err error, target *T) bool {
  245. if err == nil {
  246. return false
  247. }
  248. return errors.As(err, target)
  249. }
  250. // GetPresignedURL generates a presigned URL for downloading a file
  251. func (a *S3FileAdapter) GetPresignedURL(ctx context.Context, path string, expirySeconds int64) (string, error) {
  252. key := a.fullKey(path)
  253. presignClient := s3.NewPresignClient(a.client)
  254. request, err := presignClient.PresignGetObject(ctx, &s3.GetObjectInput{
  255. Bucket: aws.String(a.bucket),
  256. Key: aws.String(key),
  257. }, func(opts *s3.PresignOptions) {
  258. opts.Expires = time.Duration(expirySeconds) * time.Second
  259. })
  260. if err != nil {
  261. return "", fmt.Errorf("failed to generate presigned URL: %w", err)
  262. }
  263. return request.URL, nil
  264. }
  265. // GetUploadPresignedURL generates a presigned URL for uploading a file
  266. func (a *S3FileAdapter) GetUploadPresignedURL(ctx context.Context, path string, expirySeconds int64) (string, error) {
  267. key := a.fullKey(path)
  268. presignClient := s3.NewPresignClient(a.client)
  269. request, err := presignClient.PresignPutObject(ctx, &s3.PutObjectInput{
  270. Bucket: aws.String(a.bucket),
  271. Key: aws.String(key),
  272. }, func(opts *s3.PresignOptions) {
  273. opts.Expires = time.Duration(expirySeconds) * time.Second
  274. })
  275. if err != nil {
  276. return "", fmt.Errorf("failed to generate upload presigned URL: %w", err)
  277. }
  278. return request.URL, nil
  279. }
  280. // GetObjectMetadata returns metadata for an S3 object
  281. func (a *S3FileAdapter) GetObjectMetadata(ctx context.Context, path string) (map[string]string, error) {
  282. key := a.fullKey(path)
  283. output, err := a.client.HeadObject(ctx, &s3.HeadObjectInput{
  284. Bucket: aws.String(a.bucket),
  285. Key: aws.String(key),
  286. })
  287. if err != nil {
  288. return nil, fmt.Errorf("failed to get object metadata: %w", err)
  289. }
  290. metadata := make(map[string]string)
  291. for k, v := range output.Metadata {
  292. metadata[k] = v
  293. }
  294. // Add standard metadata
  295. if output.ContentType != nil {
  296. metadata["content-type"] = *output.ContentType
  297. }
  298. if output.ContentLength != nil {
  299. metadata["content-length"] = fmt.Sprintf("%d", *output.ContentLength)
  300. }
  301. if output.LastModified != nil {
  302. metadata["last-modified"] = output.LastModified.String()
  303. }
  304. if output.ETag != nil {
  305. metadata["etag"] = *output.ETag
  306. }
  307. return metadata, nil
  308. }
  309. // WriteWithMetadata writes a file to S3 with custom metadata
  310. func (a *S3FileAdapter) WriteWithMetadata(ctx context.Context, path string, content []byte, contentType string, metadata map[string]string) error {
  311. key := a.fullKey(path)
  312. input := &s3.PutObjectInput{
  313. Bucket: aws.String(a.bucket),
  314. Key: aws.String(key),
  315. Body: bytes.NewReader(content),
  316. Metadata: metadata,
  317. }
  318. if contentType != "" {
  319. input.ContentType = aws.String(contentType)
  320. } else {
  321. // Infer content type from extension
  322. input.ContentType = aws.String(inferContentType(path))
  323. }
  324. _, err := a.client.PutObject(ctx, input)
  325. if err != nil {
  326. return fmt.Errorf("failed to write to S3: %w", err)
  327. }
  328. return nil
  329. }
  330. // inferContentType infers content type from file extension
  331. func inferContentType(path string) string {
  332. ext := strings.ToLower(filepath.Ext(path))
  333. switch ext {
  334. case ".json":
  335. return "application/json"
  336. case ".txt":
  337. return "text/plain"
  338. case ".html":
  339. return "text/html"
  340. case ".css":
  341. return "text/css"
  342. case ".js":
  343. return "application/javascript"
  344. case ".ts":
  345. return "application/typescript"
  346. case ".tsx":
  347. return "application/typescript"
  348. case ".md":
  349. return "text/markdown"
  350. case ".xml":
  351. return "application/xml"
  352. case ".yaml", ".yml":
  353. return "application/yaml"
  354. case ".png":
  355. return "image/png"
  356. case ".jpg", ".jpeg":
  357. return "image/jpeg"
  358. case ".gif":
  359. return "image/gif"
  360. case ".svg":
  361. return "image/svg+xml"
  362. case ".pdf":
  363. return "application/pdf"
  364. default:
  365. return "application/octet-stream"
  366. }
  367. }