package workflow import ( "bytes" "context" "encoding/json" "fmt" "io" "maps" "net/http" neturl "net/url" "strings" "sync" "time" ) // ServiceAdapter defines the interface for calling project services type ServiceAdapter interface { // Call invokes a service with the given parameters and returns the result Call(ctx context.Context, serviceName string, params map[string]any) (*ServiceResult, error) } // ComponentAdapter defines the interface for calling system components type ComponentAdapter interface { // Call invokes a component with the given parameters and returns the result Call(ctx context.Context, componentID string, params map[string]any) (map[string]any, error) } // LLMAdapter defines the interface for calling LLM services type LLMAdapter interface { // Call invokes an LLM with the given parameters // For streaming responses, the stream channel will receive chunks Call(ctx context.Context, params map[string]any, stream chan<- string) (map[string]any, error) } // FileAdapter defines the interface for file operations type FileAdapter interface { // Read reads a file from the input path Read(ctx context.Context, path string) ([]byte, error) // Write writes content to an artifact path with the specified mode Write(ctx context.Context, path string, content []byte, mode WriteMode) error // Exists checks if a file exists Exists(ctx context.Context, path string) (bool, error) // List lists files matching a pattern List(ctx context.Context, pattern string) ([]string, error) } // APIAdapter defines the interface for calling third-party HTTP APIs type APIAdapter interface { // Call invokes an HTTP API with the given definition and parameters // Returns the parsed response body (typically JSON) Call(ctx context.Context, apiDef *APIDefinition, params map[string]any) (map[string]any, error) } // DocAdapter defines the interface for resolving semantic document references type DocAdapter interface { // Get retrieves the content of a document by its ID Get(ctx context.Context, docID string) (string, error) } // LLMAdapterRegistry holds multiple named LLM adapters and resolves the // correct adapter for a given model spec string (v3.16+). // // Model spec formats: // - "" (empty): use default adapter with its configured model // - "openai" (no /): use the "openai" adapter with its configured default model // - "openai/gpt-4.1" (has /): use the "openai" adapter with model override "gpt-4.1" type LLMAdapterRegistry struct { adapters map[string]LLMAdapter // keyed by provider name defaultAdapter LLMAdapter defaultProvider string } // NewLLMAdapterRegistry creates a new multi-provider adapter registry. func NewLLMAdapterRegistry(defaultAdapter LLMAdapter, defaultProvider string) *LLMAdapterRegistry { return &LLMAdapterRegistry{ adapters: make(map[string]LLMAdapter), defaultAdapter: defaultAdapter, defaultProvider: defaultProvider, } } // Register adds a provider adapter to the registry. func (r *LLMAdapterRegistry) Register(provider string, adapter LLMAdapter) { r.adapters[provider] = adapter } // Resolve returns the adapter and optional model override for the given model spec. // If modelSpec is empty, returns the default adapter with no override. // If modelSpec has no "/", it is treated as a pure provider name (adapter's default model is used). // If modelSpec has "/", it is split into provider + modelId. func (r *LLMAdapterRegistry) Resolve(modelSpec string) (adapter LLMAdapter, modelOverride string, err error) { if modelSpec == "" { return r.defaultAdapter, "", nil } if strings.Contains(modelSpec, "/") { parts := strings.SplitN(modelSpec, "/", 2) provider, modelId := parts[0], parts[1] a, ok := r.adapters[provider] if !ok { return nil, "", fmt.Errorf("unsupported LLM provider: %q", provider) } return a, modelId, nil } // Provider-only: use that adapter's configured default model a, ok := r.adapters[modelSpec] if !ok { return nil, "", fmt.Errorf("unsupported LLM provider: %q", modelSpec) } return a, "", nil } // Call implements LLMAdapter by delegating to the default adapter. // This makes LLMAdapterRegistry usable as a drop-in LLMAdapter for backward compatibility. func (r *LLMAdapterRegistry) Call(ctx context.Context, params map[string]any, stream chan<- string) (map[string]any, error) { return r.defaultAdapter.Call(ctx, params, stream) } // DefaultDocAdapter is a simple in-memory doc adapter for testing type DefaultDocAdapter struct { docs map[string]string mu sync.RWMutex } // NewDefaultDocAdapter creates a new default doc adapter func NewDefaultDocAdapter() *DefaultDocAdapter { return &DefaultDocAdapter{ docs: make(map[string]string), } } // SetDoc stores a document by ID func (d *DefaultDocAdapter) SetDoc(id, content string) { d.mu.Lock() defer d.mu.Unlock() d.docs[id] = content } // Get implements DocAdapter func (d *DefaultDocAdapter) Get(ctx context.Context, docID string) (string, error) { d.mu.RLock() defer d.mu.RUnlock() content, ok := d.docs[docID] if !ok { return "", fmt.Errorf("document not found: %s", docID) } return content, nil } // DefaultServiceAdapter is a simple in-memory service adapter for testing type DefaultServiceAdapter struct { handlers map[string]ServiceHandler mu sync.RWMutex } // ServiceHandler is a function that handles service calls type ServiceHandler func(ctx context.Context, params map[string]any) (*ServiceResult, error) // NewDefaultServiceAdapter creates a new default service adapter func NewDefaultServiceAdapter() *DefaultServiceAdapter { return &DefaultServiceAdapter{ handlers: make(map[string]ServiceHandler), } } // RegisterHandler registers a service handler func (a *DefaultServiceAdapter) RegisterHandler(serviceName string, handler ServiceHandler) { a.mu.Lock() defer a.mu.Unlock() a.handlers[serviceName] = handler } // Call implements ServiceAdapter func (a *DefaultServiceAdapter) Call(ctx context.Context, serviceName string, params map[string]any) (*ServiceResult, error) { a.mu.RLock() handler, ok := a.handlers[serviceName] a.mu.RUnlock() if !ok { // Return empty result if no handler return &ServiceResult{ Data: make(map[string]any), }, nil } return handler(ctx, params) } // DefaultComponentAdapter is a simple in-memory component adapter for testing type DefaultComponentAdapter struct { handlers map[string]ComponentHandler mu sync.RWMutex } // ComponentHandler is a function that handles component calls type ComponentHandler func(ctx context.Context, params map[string]any) (map[string]any, error) // NewDefaultComponentAdapter creates a new default component adapter func NewDefaultComponentAdapter() *DefaultComponentAdapter { return &DefaultComponentAdapter{ handlers: make(map[string]ComponentHandler), } } // RegisterHandler registers a component handler func (a *DefaultComponentAdapter) RegisterHandler(componentID string, handler ComponentHandler) { a.mu.Lock() defer a.mu.Unlock() a.handlers[componentID] = handler } // Call implements ComponentAdapter func (a *DefaultComponentAdapter) Call(ctx context.Context, componentID string, params map[string]any) (map[string]any, error) { a.mu.RLock() handler, ok := a.handlers[componentID] a.mu.RUnlock() if !ok { return make(map[string]any), nil } return handler(ctx, params) } // DefaultLLMAdapter is a simple in-memory LLM adapter for testing type DefaultLLMAdapter struct { handler LLMHandler mu sync.RWMutex } // LLMHandler is a function that handles LLM calls type LLMHandler func(ctx context.Context, params map[string]any, stream chan<- string) (map[string]any, error) // NewDefaultLLMAdapter creates a new default LLM adapter func NewDefaultLLMAdapter() *DefaultLLMAdapter { return &DefaultLLMAdapter{} } // SetHandler sets the LLM handler func (a *DefaultLLMAdapter) SetHandler(handler LLMHandler) { a.mu.Lock() defer a.mu.Unlock() a.handler = handler } // Call implements LLMAdapter func (a *DefaultLLMAdapter) Call(ctx context.Context, params map[string]any, stream chan<- string) (map[string]any, error) { a.mu.RLock() handler := a.handler a.mu.RUnlock() if handler == nil { return map[string]any{ "content": "Mock LLM response", }, nil } return handler(ctx, params, stream) } // DefaultFileAdapter is a simple in-memory file adapter for testing type DefaultFileAdapter struct { files map[string][]byte mu sync.RWMutex } // NewDefaultFileAdapter creates a new default file adapter func NewDefaultFileAdapter() *DefaultFileAdapter { return &DefaultFileAdapter{ files: make(map[string][]byte), } } // Read implements FileAdapter func (a *DefaultFileAdapter) Read(ctx context.Context, path string) ([]byte, error) { a.mu.RLock() content, ok := a.files[path] a.mu.RUnlock() if !ok { return nil, &FileNotFoundError{Path: path} } return content, nil } // Write implements FileAdapter func (a *DefaultFileAdapter) Write(ctx context.Context, path string, content []byte, mode WriteMode) error { a.mu.Lock() defer a.mu.Unlock() switch mode { case WriteModeFailIfExists: if _, exists := a.files[path]; exists { return &FileExistsError{Path: path} } a.files[path] = content case WriteModeAppend: existing := a.files[path] a.files[path] = append(existing, content...) case WriteModePrepend: existing := a.files[path] a.files[path] = append(content, existing...) case WriteModeOverwrite: fallthrough default: a.files[path] = content } return nil } // Exists implements FileAdapter func (a *DefaultFileAdapter) Exists(ctx context.Context, path string) (bool, error) { a.mu.RLock() _, exists := a.files[path] a.mu.RUnlock() return exists, nil } // List implements FileAdapter func (a *DefaultFileAdapter) List(ctx context.Context, pattern string) ([]string, error) { a.mu.RLock() defer a.mu.RUnlock() var matches []string for path := range a.files { if matchPattern(path, pattern) { matches = append(matches, path) } } return matches, nil } // SetFile sets a file in the in-memory store (for testing) func (a *DefaultFileAdapter) SetFile(path string, content []byte) { a.mu.Lock() defer a.mu.Unlock() a.files[path] = content } // GetFile gets a file from the in-memory store (for testing) func (a *DefaultFileAdapter) GetFile(path string) []byte { a.mu.RLock() defer a.mu.RUnlock() return a.files[path] } // Errors // FileNotFoundError is returned when a file is not found type FileNotFoundError struct { Path string } func (e *FileNotFoundError) Error() string { return "file not found: " + e.Path } // FileExistsError is returned when a file already exists type FileExistsError struct { Path string } func (e *FileExistsError) Error() string { return "file already exists: " + e.Path } // StreamWriter wraps an io.Writer for streaming output type StreamWriter struct { writer io.Writer } // NewStreamWriter creates a new stream writer func NewStreamWriter(w io.Writer) *StreamWriter { return &StreamWriter{writer: w} } // Write writes a chunk to the stream func (sw *StreamWriter) Write(chunk string) error { _, err := sw.writer.Write([]byte(chunk)) return err } // DefaultAPIAdapter is an HTTP client-based API adapter. // Params accepted by Call (all optional): // - pathParams (object): values substituted into {placeholder} segments of the URL // - query (object): appended as URL query string // - body (any): JSON-encoded request body; sets Content-Type: application/json // - headers (object): per-request headers that override apiDef.Headers // - authToken (string): resolved by the executor from apiDef.Auth; sent as Bearer token // // Result keys returned on success: // - _status (number): HTTP status code // - All top-level keys from the JSON response body, or "body" (string) for non-JSON responses type DefaultAPIAdapter struct { client *http.Client } // NewDefaultAPIAdapter creates an adapter with a 30-second request timeout. func NewDefaultAPIAdapter() *DefaultAPIAdapter { return &DefaultAPIAdapter{ client: &http.Client{ Timeout: 30 * time.Second, }, } } // NewDefaultAPIAdapterWithTimeout creates an adapter with a custom timeout. func NewDefaultAPIAdapterWithTimeout(timeout time.Duration) *DefaultAPIAdapter { return &DefaultAPIAdapter{ client: &http.Client{Timeout: timeout}, } } // Call implements APIAdapter. func (a *DefaultAPIAdapter) Call(ctx context.Context, apiDef *APIDefinition, params map[string]any) (map[string]any, error) { // 1. Substitute {placeholder} path segments — values are URL-path-encoded. rawURL := apiDef.URL if pathParams, ok := params["pathParams"].(map[string]any); ok { for key, val := range pathParams { rawURL = strings.ReplaceAll(rawURL, "{"+key+"}", neturl.PathEscape(fmt.Sprintf("%v", val))) } } // 2. Append query string. if queryParams, ok := params["query"].(map[string]any); ok { q := neturl.Values{} for key, val := range queryParams { q.Set(key, fmt.Sprintf("%v", val)) } if len(q) > 0 { rawURL += "?" + q.Encode() } } // 3. Marshal request body. var bodyReader io.Reader var hasBody bool if body, ok := params["body"]; ok && body != nil { bodyBytes, err := json.Marshal(body) if err != nil { return nil, fmt.Errorf("failed to marshal request body: %w", err) } bodyReader = bytes.NewReader(bodyBytes) hasBody = true } // 4. Build request bound to the workflow context so cancellation propagates. req, err := http.NewRequestWithContext(ctx, strings.ToUpper(apiDef.Method), rawURL, bodyReader) if err != nil { return nil, fmt.Errorf("failed to create HTTP request: %w", err) } // 5. Apply static headers from the API definition first. for key, val := range apiDef.Headers { req.Header.Set(key, val) } // 6. Per-call headers override static ones. if headers, ok := params["headers"].(map[string]any); ok { for key, val := range headers { req.Header.Set(key, fmt.Sprintf("%v", val)) } } // 7. Set Content-Type when a body is present (unless caller already set it). if hasBody && req.Header.Get("Content-Type") == "" { req.Header.Set("Content-Type", "application/json") } // 8. Authorization header — token resolved by the executor from apiDef.Auth. if authToken, ok := params["authToken"].(string); ok && authToken != "" { req.Header.Set("Authorization", "Bearer "+authToken) } // 9. Execute request. resp, err := a.client.Do(req) if err != nil { return nil, fmt.Errorf("HTTP request failed: %w", err) } defer resp.Body.Close() // 10. Read full response body. respBody, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) } // 11. Non-2xx → error with status and body for caller inspection. if resp.StatusCode < 200 || resp.StatusCode >= 300 { return nil, fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(respBody)) } // 12. Build result: start with _status so callers can read it from _result._status. result := map[string]any{ "_status": resp.StatusCode, } if len(respBody) == 0 { return result, nil } // 13. Merge JSON fields into result; fall back to raw "body" string for non-JSON. var parsed map[string]any if err := json.Unmarshal(respBody, &parsed); err != nil { result["body"] = string(respBody) } else { maps.Copy(result, parsed) } return result, nil }