| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512 |
- package workflow
- import (
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "io"
- "maps"
- "net/http"
- neturl "net/url"
- "strings"
- "sync"
- "time"
- )
- // ServiceAdapter defines the interface for calling project services
- type ServiceAdapter interface {
- // Call invokes a service with the given parameters and returns the result
- Call(ctx context.Context, serviceName string, params map[string]any) (*ServiceResult, error)
- }
- // ComponentAdapter defines the interface for calling system components
- type ComponentAdapter interface {
- // Call invokes a component with the given parameters and returns the result
- Call(ctx context.Context, componentID string, params map[string]any) (map[string]any, error)
- }
- // LLMAdapter defines the interface for calling LLM services
- type LLMAdapter interface {
- // Call invokes an LLM with the given parameters
- // For streaming responses, the stream channel will receive chunks
- Call(ctx context.Context, params map[string]any, stream chan<- string) (map[string]any, error)
- }
- // FileAdapter defines the interface for file operations
- type FileAdapter interface {
- // Read reads a file from the input path
- Read(ctx context.Context, path string) ([]byte, error)
- // Write writes content to an artifact path with the specified mode
- Write(ctx context.Context, path string, content []byte, mode WriteMode) error
- // Exists checks if a file exists
- Exists(ctx context.Context, path string) (bool, error)
- // List lists files matching a pattern
- List(ctx context.Context, pattern string) ([]string, error)
- }
- // APIAdapter defines the interface for calling third-party HTTP APIs
- type APIAdapter interface {
- // Call invokes an HTTP API with the given definition and parameters
- // Returns the parsed response body (typically JSON)
- Call(ctx context.Context, apiDef *APIDefinition, params map[string]any) (map[string]any, error)
- }
- // DocAdapter defines the interface for resolving semantic document references
- type DocAdapter interface {
- // Get retrieves the content of a document by its ID
- Get(ctx context.Context, docID string) (string, error)
- }
- // LLMAdapterRegistry holds multiple named LLM adapters and resolves the
- // correct adapter for a given model spec string (v3.16+).
- //
- // Model spec formats:
- // - "" (empty): use default adapter with its configured model
- // - "openai" (no /): use the "openai" adapter with its configured default model
- // - "openai/gpt-4.1" (has /): use the "openai" adapter with model override "gpt-4.1"
- type LLMAdapterRegistry struct {
- adapters map[string]LLMAdapter // keyed by provider name
- defaultAdapter LLMAdapter
- defaultProvider string
- }
- // NewLLMAdapterRegistry creates a new multi-provider adapter registry.
- func NewLLMAdapterRegistry(defaultAdapter LLMAdapter, defaultProvider string) *LLMAdapterRegistry {
- return &LLMAdapterRegistry{
- adapters: make(map[string]LLMAdapter),
- defaultAdapter: defaultAdapter,
- defaultProvider: defaultProvider,
- }
- }
- // Register adds a provider adapter to the registry.
- func (r *LLMAdapterRegistry) Register(provider string, adapter LLMAdapter) {
- r.adapters[provider] = adapter
- }
- // Resolve returns the adapter and optional model override for the given model spec.
- // If modelSpec is empty, returns the default adapter with no override.
- // If modelSpec has no "/", it is treated as a pure provider name (adapter's default model is used).
- // If modelSpec has "/", it is split into provider + modelId.
- func (r *LLMAdapterRegistry) Resolve(modelSpec string) (adapter LLMAdapter, modelOverride string, err error) {
- if modelSpec == "" {
- return r.defaultAdapter, "", nil
- }
- if strings.Contains(modelSpec, "/") {
- parts := strings.SplitN(modelSpec, "/", 2)
- provider, modelId := parts[0], parts[1]
- a, ok := r.adapters[provider]
- if !ok {
- return nil, "", fmt.Errorf("unsupported LLM provider: %q", provider)
- }
- return a, modelId, nil
- }
- // Provider-only: use that adapter's configured default model
- a, ok := r.adapters[modelSpec]
- if !ok {
- return nil, "", fmt.Errorf("unsupported LLM provider: %q", modelSpec)
- }
- return a, "", nil
- }
- // Call implements LLMAdapter by delegating to the default adapter.
- // This makes LLMAdapterRegistry usable as a drop-in LLMAdapter for backward compatibility.
- func (r *LLMAdapterRegistry) Call(ctx context.Context, params map[string]any, stream chan<- string) (map[string]any, error) {
- return r.defaultAdapter.Call(ctx, params, stream)
- }
- // DefaultDocAdapter is a simple in-memory doc adapter for testing
- type DefaultDocAdapter struct {
- docs map[string]string
- mu sync.RWMutex
- }
- // NewDefaultDocAdapter creates a new default doc adapter
- func NewDefaultDocAdapter() *DefaultDocAdapter {
- return &DefaultDocAdapter{
- docs: make(map[string]string),
- }
- }
- // SetDoc stores a document by ID
- func (d *DefaultDocAdapter) SetDoc(id, content string) {
- d.mu.Lock()
- defer d.mu.Unlock()
- d.docs[id] = content
- }
- // Get implements DocAdapter
- func (d *DefaultDocAdapter) Get(ctx context.Context, docID string) (string, error) {
- d.mu.RLock()
- defer d.mu.RUnlock()
- content, ok := d.docs[docID]
- if !ok {
- return "", fmt.Errorf("document not found: %s", docID)
- }
- return content, nil
- }
- // DefaultServiceAdapter is a simple in-memory service adapter for testing
- type DefaultServiceAdapter struct {
- handlers map[string]ServiceHandler
- mu sync.RWMutex
- }
- // ServiceHandler is a function that handles service calls
- type ServiceHandler func(ctx context.Context, params map[string]any) (*ServiceResult, error)
- // NewDefaultServiceAdapter creates a new default service adapter
- func NewDefaultServiceAdapter() *DefaultServiceAdapter {
- return &DefaultServiceAdapter{
- handlers: make(map[string]ServiceHandler),
- }
- }
- // RegisterHandler registers a service handler
- func (a *DefaultServiceAdapter) RegisterHandler(serviceName string, handler ServiceHandler) {
- a.mu.Lock()
- defer a.mu.Unlock()
- a.handlers[serviceName] = handler
- }
- // Call implements ServiceAdapter
- func (a *DefaultServiceAdapter) Call(ctx context.Context, serviceName string, params map[string]any) (*ServiceResult, error) {
- a.mu.RLock()
- handler, ok := a.handlers[serviceName]
- a.mu.RUnlock()
- if !ok {
- // Return empty result if no handler
- return &ServiceResult{
- Data: make(map[string]any),
- }, nil
- }
- return handler(ctx, params)
- }
- // DefaultComponentAdapter is a simple in-memory component adapter for testing
- type DefaultComponentAdapter struct {
- handlers map[string]ComponentHandler
- mu sync.RWMutex
- }
- // ComponentHandler is a function that handles component calls
- type ComponentHandler func(ctx context.Context, params map[string]any) (map[string]any, error)
- // NewDefaultComponentAdapter creates a new default component adapter
- func NewDefaultComponentAdapter() *DefaultComponentAdapter {
- return &DefaultComponentAdapter{
- handlers: make(map[string]ComponentHandler),
- }
- }
- // RegisterHandler registers a component handler
- func (a *DefaultComponentAdapter) RegisterHandler(componentID string, handler ComponentHandler) {
- a.mu.Lock()
- defer a.mu.Unlock()
- a.handlers[componentID] = handler
- }
- // Call implements ComponentAdapter
- func (a *DefaultComponentAdapter) Call(ctx context.Context, componentID string, params map[string]any) (map[string]any, error) {
- a.mu.RLock()
- handler, ok := a.handlers[componentID]
- a.mu.RUnlock()
- if !ok {
- return make(map[string]any), nil
- }
- return handler(ctx, params)
- }
- // DefaultLLMAdapter is a simple in-memory LLM adapter for testing
- type DefaultLLMAdapter struct {
- handler LLMHandler
- mu sync.RWMutex
- }
- // LLMHandler is a function that handles LLM calls
- type LLMHandler func(ctx context.Context, params map[string]any, stream chan<- string) (map[string]any, error)
- // NewDefaultLLMAdapter creates a new default LLM adapter
- func NewDefaultLLMAdapter() *DefaultLLMAdapter {
- return &DefaultLLMAdapter{}
- }
- // SetHandler sets the LLM handler
- func (a *DefaultLLMAdapter) SetHandler(handler LLMHandler) {
- a.mu.Lock()
- defer a.mu.Unlock()
- a.handler = handler
- }
- // Call implements LLMAdapter
- func (a *DefaultLLMAdapter) Call(ctx context.Context, params map[string]any, stream chan<- string) (map[string]any, error) {
- a.mu.RLock()
- handler := a.handler
- a.mu.RUnlock()
- if handler == nil {
- return map[string]any{
- "content": "Mock LLM response",
- }, nil
- }
- return handler(ctx, params, stream)
- }
- // DefaultFileAdapter is a simple in-memory file adapter for testing
- type DefaultFileAdapter struct {
- files map[string][]byte
- mu sync.RWMutex
- }
- // NewDefaultFileAdapter creates a new default file adapter
- func NewDefaultFileAdapter() *DefaultFileAdapter {
- return &DefaultFileAdapter{
- files: make(map[string][]byte),
- }
- }
- // Read implements FileAdapter
- func (a *DefaultFileAdapter) Read(ctx context.Context, path string) ([]byte, error) {
- a.mu.RLock()
- content, ok := a.files[path]
- a.mu.RUnlock()
- if !ok {
- return nil, &FileNotFoundError{Path: path}
- }
- return content, nil
- }
- // Write implements FileAdapter
- func (a *DefaultFileAdapter) Write(ctx context.Context, path string, content []byte, mode WriteMode) error {
- a.mu.Lock()
- defer a.mu.Unlock()
- switch mode {
- case WriteModeFailIfExists:
- if _, exists := a.files[path]; exists {
- return &FileExistsError{Path: path}
- }
- a.files[path] = content
- case WriteModeAppend:
- existing := a.files[path]
- a.files[path] = append(existing, content...)
- case WriteModePrepend:
- existing := a.files[path]
- a.files[path] = append(content, existing...)
- case WriteModeOverwrite:
- fallthrough
- default:
- a.files[path] = content
- }
- return nil
- }
- // Exists implements FileAdapter
- func (a *DefaultFileAdapter) Exists(ctx context.Context, path string) (bool, error) {
- a.mu.RLock()
- _, exists := a.files[path]
- a.mu.RUnlock()
- return exists, nil
- }
- // List implements FileAdapter
- func (a *DefaultFileAdapter) List(ctx context.Context, pattern string) ([]string, error) {
- a.mu.RLock()
- defer a.mu.RUnlock()
- var matches []string
- for path := range a.files {
- if matchPattern(path, pattern) {
- matches = append(matches, path)
- }
- }
- return matches, nil
- }
- // SetFile sets a file in the in-memory store (for testing)
- func (a *DefaultFileAdapter) SetFile(path string, content []byte) {
- a.mu.Lock()
- defer a.mu.Unlock()
- a.files[path] = content
- }
- // GetFile gets a file from the in-memory store (for testing)
- func (a *DefaultFileAdapter) GetFile(path string) []byte {
- a.mu.RLock()
- defer a.mu.RUnlock()
- return a.files[path]
- }
- // Errors
- // FileNotFoundError is returned when a file is not found
- type FileNotFoundError struct {
- Path string
- }
- func (e *FileNotFoundError) Error() string {
- return "file not found: " + e.Path
- }
- // FileExistsError is returned when a file already exists
- type FileExistsError struct {
- Path string
- }
- func (e *FileExistsError) Error() string {
- return "file already exists: " + e.Path
- }
- // StreamWriter wraps an io.Writer for streaming output
- type StreamWriter struct {
- writer io.Writer
- }
- // NewStreamWriter creates a new stream writer
- func NewStreamWriter(w io.Writer) *StreamWriter {
- return &StreamWriter{writer: w}
- }
- // Write writes a chunk to the stream
- func (sw *StreamWriter) Write(chunk string) error {
- _, err := sw.writer.Write([]byte(chunk))
- return err
- }
- // DefaultAPIAdapter is an HTTP client-based API adapter.
- // Params accepted by Call (all optional):
- // - pathParams (object): values substituted into {placeholder} segments of the URL
- // - query (object): appended as URL query string
- // - body (any): JSON-encoded request body; sets Content-Type: application/json
- // - headers (object): per-request headers that override apiDef.Headers
- // - authToken (string): resolved by the executor from apiDef.Auth; sent as Bearer token
- //
- // Result keys returned on success:
- // - _status (number): HTTP status code
- // - All top-level keys from the JSON response body, or "body" (string) for non-JSON responses
- type DefaultAPIAdapter struct {
- client *http.Client
- }
- // NewDefaultAPIAdapter creates an adapter with a 30-second request timeout.
- func NewDefaultAPIAdapter() *DefaultAPIAdapter {
- return &DefaultAPIAdapter{
- client: &http.Client{
- Timeout: 30 * time.Second,
- },
- }
- }
- // NewDefaultAPIAdapterWithTimeout creates an adapter with a custom timeout.
- func NewDefaultAPIAdapterWithTimeout(timeout time.Duration) *DefaultAPIAdapter {
- return &DefaultAPIAdapter{
- client: &http.Client{Timeout: timeout},
- }
- }
- // Call implements APIAdapter.
- func (a *DefaultAPIAdapter) Call(ctx context.Context, apiDef *APIDefinition, params map[string]any) (map[string]any, error) {
- // 1. Substitute {placeholder} path segments — values are URL-path-encoded.
- rawURL := apiDef.URL
- if pathParams, ok := params["pathParams"].(map[string]any); ok {
- for key, val := range pathParams {
- rawURL = strings.ReplaceAll(rawURL, "{"+key+"}", neturl.PathEscape(fmt.Sprintf("%v", val)))
- }
- }
- // 2. Append query string.
- if queryParams, ok := params["query"].(map[string]any); ok {
- q := neturl.Values{}
- for key, val := range queryParams {
- q.Set(key, fmt.Sprintf("%v", val))
- }
- if len(q) > 0 {
- rawURL += "?" + q.Encode()
- }
- }
- // 3. Marshal request body.
- var bodyReader io.Reader
- var hasBody bool
- if body, ok := params["body"]; ok && body != nil {
- bodyBytes, err := json.Marshal(body)
- if err != nil {
- return nil, fmt.Errorf("failed to marshal request body: %w", err)
- }
- bodyReader = bytes.NewReader(bodyBytes)
- hasBody = true
- }
- // 4. Build request bound to the workflow context so cancellation propagates.
- req, err := http.NewRequestWithContext(ctx, strings.ToUpper(apiDef.Method), rawURL, bodyReader)
- if err != nil {
- return nil, fmt.Errorf("failed to create HTTP request: %w", err)
- }
- // 5. Apply static headers from the API definition first.
- for key, val := range apiDef.Headers {
- req.Header.Set(key, val)
- }
- // 6. Per-call headers override static ones.
- if headers, ok := params["headers"].(map[string]any); ok {
- for key, val := range headers {
- req.Header.Set(key, fmt.Sprintf("%v", val))
- }
- }
- // 7. Set Content-Type when a body is present (unless caller already set it).
- if hasBody && req.Header.Get("Content-Type") == "" {
- req.Header.Set("Content-Type", "application/json")
- }
- // 8. Authorization header — token resolved by the executor from apiDef.Auth.
- if authToken, ok := params["authToken"].(string); ok && authToken != "" {
- req.Header.Set("Authorization", "Bearer "+authToken)
- }
- // 9. Execute request.
- resp, err := a.client.Do(req)
- if err != nil {
- return nil, fmt.Errorf("HTTP request failed: %w", err)
- }
- defer resp.Body.Close()
- // 10. Read full response body.
- respBody, err := io.ReadAll(resp.Body)
- if err != nil {
- return nil, fmt.Errorf("failed to read response body: %w", err)
- }
- // 11. Non-2xx → error with status and body for caller inspection.
- if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- return nil, fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(respBody))
- }
- // 12. Build result: start with _status so callers can read it from _result._status.
- result := map[string]any{
- "_status": resp.StatusCode,
- }
- if len(respBody) == 0 {
- return result, nil
- }
- // 13. Merge JSON fields into result; fall back to raw "body" string for non-JSON.
- var parsed map[string]any
- if err := json.Unmarshal(respBody, &parsed); err != nil {
- result["body"] = string(respBody)
- } else {
- maps.Copy(result, parsed)
- }
- return result, nil
- }
|