adapters.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. package workflow
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "maps"
  9. "net/http"
  10. neturl "net/url"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. // ServiceAdapter defines the interface for calling project services
  16. type ServiceAdapter interface {
  17. // Call invokes a service with the given parameters and returns the result
  18. Call(ctx context.Context, serviceName string, params map[string]any) (*ServiceResult, error)
  19. }
  20. // ComponentAdapter defines the interface for calling system components
  21. type ComponentAdapter interface {
  22. // Call invokes a component with the given parameters and returns the result
  23. Call(ctx context.Context, componentID string, params map[string]any) (map[string]any, error)
  24. }
  25. // LLMAdapter defines the interface for calling LLM services
  26. type LLMAdapter interface {
  27. // Call invokes an LLM with the given parameters
  28. // For streaming responses, the stream channel will receive chunks
  29. Call(ctx context.Context, params map[string]any, stream chan<- string) (map[string]any, error)
  30. }
  31. // FileAdapter defines the interface for file operations
  32. type FileAdapter interface {
  33. // Read reads a file from the input path
  34. Read(ctx context.Context, path string) ([]byte, error)
  35. // Write writes content to an artifact path with the specified mode
  36. Write(ctx context.Context, path string, content []byte, mode WriteMode) error
  37. // Exists checks if a file exists
  38. Exists(ctx context.Context, path string) (bool, error)
  39. // List lists files matching a pattern
  40. List(ctx context.Context, pattern string) ([]string, error)
  41. }
  42. // APIAdapter defines the interface for calling third-party HTTP APIs
  43. type APIAdapter interface {
  44. // Call invokes an HTTP API with the given definition and parameters
  45. // Returns the parsed response body (typically JSON)
  46. Call(ctx context.Context, apiDef *APIDefinition, params map[string]any) (map[string]any, error)
  47. }
  48. // DocAdapter defines the interface for resolving semantic document references
  49. type DocAdapter interface {
  50. // Get retrieves the content of a document by its ID
  51. Get(ctx context.Context, docID string) (string, error)
  52. }
  53. // LLMAdapterRegistry holds multiple named LLM adapters and resolves the
  54. // correct adapter for a given model spec string (v3.16+).
  55. //
  56. // Model spec formats:
  57. // - "" (empty): use default adapter with its configured model
  58. // - "openai" (no /): use the "openai" adapter with its configured default model
  59. // - "openai/gpt-4.1" (has /): use the "openai" adapter with model override "gpt-4.1"
  60. type LLMAdapterRegistry struct {
  61. adapters map[string]LLMAdapter // keyed by provider name
  62. defaultAdapter LLMAdapter
  63. defaultProvider string
  64. }
  65. // NewLLMAdapterRegistry creates a new multi-provider adapter registry.
  66. func NewLLMAdapterRegistry(defaultAdapter LLMAdapter, defaultProvider string) *LLMAdapterRegistry {
  67. return &LLMAdapterRegistry{
  68. adapters: make(map[string]LLMAdapter),
  69. defaultAdapter: defaultAdapter,
  70. defaultProvider: defaultProvider,
  71. }
  72. }
  73. // Register adds a provider adapter to the registry.
  74. func (r *LLMAdapterRegistry) Register(provider string, adapter LLMAdapter) {
  75. r.adapters[provider] = adapter
  76. }
  77. // Resolve returns the adapter and optional model override for the given model spec.
  78. // If modelSpec is empty, returns the default adapter with no override.
  79. // If modelSpec has no "/", it is treated as a pure provider name (adapter's default model is used).
  80. // If modelSpec has "/", it is split into provider + modelId.
  81. func (r *LLMAdapterRegistry) Resolve(modelSpec string) (adapter LLMAdapter, modelOverride string, err error) {
  82. if modelSpec == "" {
  83. return r.defaultAdapter, "", nil
  84. }
  85. if strings.Contains(modelSpec, "/") {
  86. parts := strings.SplitN(modelSpec, "/", 2)
  87. provider, modelId := parts[0], parts[1]
  88. a, ok := r.adapters[provider]
  89. if !ok {
  90. return nil, "", fmt.Errorf("unsupported LLM provider: %q", provider)
  91. }
  92. return a, modelId, nil
  93. }
  94. // Provider-only: use that adapter's configured default model
  95. a, ok := r.adapters[modelSpec]
  96. if !ok {
  97. return nil, "", fmt.Errorf("unsupported LLM provider: %q", modelSpec)
  98. }
  99. return a, "", nil
  100. }
  101. // Call implements LLMAdapter by delegating to the default adapter.
  102. // This makes LLMAdapterRegistry usable as a drop-in LLMAdapter for backward compatibility.
  103. func (r *LLMAdapterRegistry) Call(ctx context.Context, params map[string]any, stream chan<- string) (map[string]any, error) {
  104. return r.defaultAdapter.Call(ctx, params, stream)
  105. }
  106. // DefaultDocAdapter is a simple in-memory doc adapter for testing
  107. type DefaultDocAdapter struct {
  108. docs map[string]string
  109. mu sync.RWMutex
  110. }
  111. // NewDefaultDocAdapter creates a new default doc adapter
  112. func NewDefaultDocAdapter() *DefaultDocAdapter {
  113. return &DefaultDocAdapter{
  114. docs: make(map[string]string),
  115. }
  116. }
  117. // SetDoc stores a document by ID
  118. func (d *DefaultDocAdapter) SetDoc(id, content string) {
  119. d.mu.Lock()
  120. defer d.mu.Unlock()
  121. d.docs[id] = content
  122. }
  123. // Get implements DocAdapter
  124. func (d *DefaultDocAdapter) Get(ctx context.Context, docID string) (string, error) {
  125. d.mu.RLock()
  126. defer d.mu.RUnlock()
  127. content, ok := d.docs[docID]
  128. if !ok {
  129. return "", fmt.Errorf("document not found: %s", docID)
  130. }
  131. return content, nil
  132. }
  133. // DefaultServiceAdapter is a simple in-memory service adapter for testing
  134. type DefaultServiceAdapter struct {
  135. handlers map[string]ServiceHandler
  136. mu sync.RWMutex
  137. }
  138. // ServiceHandler is a function that handles service calls
  139. type ServiceHandler func(ctx context.Context, params map[string]any) (*ServiceResult, error)
  140. // NewDefaultServiceAdapter creates a new default service adapter
  141. func NewDefaultServiceAdapter() *DefaultServiceAdapter {
  142. return &DefaultServiceAdapter{
  143. handlers: make(map[string]ServiceHandler),
  144. }
  145. }
  146. // RegisterHandler registers a service handler
  147. func (a *DefaultServiceAdapter) RegisterHandler(serviceName string, handler ServiceHandler) {
  148. a.mu.Lock()
  149. defer a.mu.Unlock()
  150. a.handlers[serviceName] = handler
  151. }
  152. // Call implements ServiceAdapter
  153. func (a *DefaultServiceAdapter) Call(ctx context.Context, serviceName string, params map[string]any) (*ServiceResult, error) {
  154. a.mu.RLock()
  155. handler, ok := a.handlers[serviceName]
  156. a.mu.RUnlock()
  157. if !ok {
  158. // Return empty result if no handler
  159. return &ServiceResult{
  160. Data: make(map[string]any),
  161. }, nil
  162. }
  163. return handler(ctx, params)
  164. }
  165. // DefaultComponentAdapter is a simple in-memory component adapter for testing
  166. type DefaultComponentAdapter struct {
  167. handlers map[string]ComponentHandler
  168. mu sync.RWMutex
  169. }
  170. // ComponentHandler is a function that handles component calls
  171. type ComponentHandler func(ctx context.Context, params map[string]any) (map[string]any, error)
  172. // NewDefaultComponentAdapter creates a new default component adapter
  173. func NewDefaultComponentAdapter() *DefaultComponentAdapter {
  174. return &DefaultComponentAdapter{
  175. handlers: make(map[string]ComponentHandler),
  176. }
  177. }
  178. // RegisterHandler registers a component handler
  179. func (a *DefaultComponentAdapter) RegisterHandler(componentID string, handler ComponentHandler) {
  180. a.mu.Lock()
  181. defer a.mu.Unlock()
  182. a.handlers[componentID] = handler
  183. }
  184. // Call implements ComponentAdapter
  185. func (a *DefaultComponentAdapter) Call(ctx context.Context, componentID string, params map[string]any) (map[string]any, error) {
  186. a.mu.RLock()
  187. handler, ok := a.handlers[componentID]
  188. a.mu.RUnlock()
  189. if !ok {
  190. return make(map[string]any), nil
  191. }
  192. return handler(ctx, params)
  193. }
  194. // DefaultLLMAdapter is a simple in-memory LLM adapter for testing
  195. type DefaultLLMAdapter struct {
  196. handler LLMHandler
  197. mu sync.RWMutex
  198. }
  199. // LLMHandler is a function that handles LLM calls
  200. type LLMHandler func(ctx context.Context, params map[string]any, stream chan<- string) (map[string]any, error)
  201. // NewDefaultLLMAdapter creates a new default LLM adapter
  202. func NewDefaultLLMAdapter() *DefaultLLMAdapter {
  203. return &DefaultLLMAdapter{}
  204. }
  205. // SetHandler sets the LLM handler
  206. func (a *DefaultLLMAdapter) SetHandler(handler LLMHandler) {
  207. a.mu.Lock()
  208. defer a.mu.Unlock()
  209. a.handler = handler
  210. }
  211. // Call implements LLMAdapter
  212. func (a *DefaultLLMAdapter) Call(ctx context.Context, params map[string]any, stream chan<- string) (map[string]any, error) {
  213. a.mu.RLock()
  214. handler := a.handler
  215. a.mu.RUnlock()
  216. if handler == nil {
  217. return map[string]any{
  218. "content": "Mock LLM response",
  219. }, nil
  220. }
  221. return handler(ctx, params, stream)
  222. }
  223. // DefaultFileAdapter is a simple in-memory file adapter for testing
  224. type DefaultFileAdapter struct {
  225. files map[string][]byte
  226. mu sync.RWMutex
  227. }
  228. // NewDefaultFileAdapter creates a new default file adapter
  229. func NewDefaultFileAdapter() *DefaultFileAdapter {
  230. return &DefaultFileAdapter{
  231. files: make(map[string][]byte),
  232. }
  233. }
  234. // Read implements FileAdapter
  235. func (a *DefaultFileAdapter) Read(ctx context.Context, path string) ([]byte, error) {
  236. a.mu.RLock()
  237. content, ok := a.files[path]
  238. a.mu.RUnlock()
  239. if !ok {
  240. return nil, &FileNotFoundError{Path: path}
  241. }
  242. return content, nil
  243. }
  244. // Write implements FileAdapter
  245. func (a *DefaultFileAdapter) Write(ctx context.Context, path string, content []byte, mode WriteMode) error {
  246. a.mu.Lock()
  247. defer a.mu.Unlock()
  248. switch mode {
  249. case WriteModeFailIfExists:
  250. if _, exists := a.files[path]; exists {
  251. return &FileExistsError{Path: path}
  252. }
  253. a.files[path] = content
  254. case WriteModeAppend:
  255. existing := a.files[path]
  256. a.files[path] = append(existing, content...)
  257. case WriteModePrepend:
  258. existing := a.files[path]
  259. a.files[path] = append(content, existing...)
  260. case WriteModeOverwrite:
  261. fallthrough
  262. default:
  263. a.files[path] = content
  264. }
  265. return nil
  266. }
  267. // Exists implements FileAdapter
  268. func (a *DefaultFileAdapter) Exists(ctx context.Context, path string) (bool, error) {
  269. a.mu.RLock()
  270. _, exists := a.files[path]
  271. a.mu.RUnlock()
  272. return exists, nil
  273. }
  274. // List implements FileAdapter
  275. func (a *DefaultFileAdapter) List(ctx context.Context, pattern string) ([]string, error) {
  276. a.mu.RLock()
  277. defer a.mu.RUnlock()
  278. var matches []string
  279. for path := range a.files {
  280. if matchPattern(path, pattern) {
  281. matches = append(matches, path)
  282. }
  283. }
  284. return matches, nil
  285. }
  286. // SetFile sets a file in the in-memory store (for testing)
  287. func (a *DefaultFileAdapter) SetFile(path string, content []byte) {
  288. a.mu.Lock()
  289. defer a.mu.Unlock()
  290. a.files[path] = content
  291. }
  292. // GetFile gets a file from the in-memory store (for testing)
  293. func (a *DefaultFileAdapter) GetFile(path string) []byte {
  294. a.mu.RLock()
  295. defer a.mu.RUnlock()
  296. return a.files[path]
  297. }
  298. // Errors
  299. // FileNotFoundError is returned when a file is not found
  300. type FileNotFoundError struct {
  301. Path string
  302. }
  303. func (e *FileNotFoundError) Error() string {
  304. return "file not found: " + e.Path
  305. }
  306. // FileExistsError is returned when a file already exists
  307. type FileExistsError struct {
  308. Path string
  309. }
  310. func (e *FileExistsError) Error() string {
  311. return "file already exists: " + e.Path
  312. }
  313. // StreamWriter wraps an io.Writer for streaming output
  314. type StreamWriter struct {
  315. writer io.Writer
  316. }
  317. // NewStreamWriter creates a new stream writer
  318. func NewStreamWriter(w io.Writer) *StreamWriter {
  319. return &StreamWriter{writer: w}
  320. }
  321. // Write writes a chunk to the stream
  322. func (sw *StreamWriter) Write(chunk string) error {
  323. _, err := sw.writer.Write([]byte(chunk))
  324. return err
  325. }
  326. // DefaultAPIAdapter is an HTTP client-based API adapter.
  327. // Params accepted by Call (all optional):
  328. // - pathParams (object): values substituted into {placeholder} segments of the URL
  329. // - query (object): appended as URL query string
  330. // - body (any): JSON-encoded request body; sets Content-Type: application/json
  331. // - headers (object): per-request headers that override apiDef.Headers
  332. // - authToken (string): resolved by the executor from apiDef.Auth; sent as Bearer token
  333. //
  334. // Result keys returned on success:
  335. // - _status (number): HTTP status code
  336. // - All top-level keys from the JSON response body, or "body" (string) for non-JSON responses
  337. type DefaultAPIAdapter struct {
  338. client *http.Client
  339. }
  340. // NewDefaultAPIAdapter creates an adapter with a 30-second request timeout.
  341. func NewDefaultAPIAdapter() *DefaultAPIAdapter {
  342. return &DefaultAPIAdapter{
  343. client: &http.Client{
  344. Timeout: 30 * time.Second,
  345. },
  346. }
  347. }
  348. // NewDefaultAPIAdapterWithTimeout creates an adapter with a custom timeout.
  349. func NewDefaultAPIAdapterWithTimeout(timeout time.Duration) *DefaultAPIAdapter {
  350. return &DefaultAPIAdapter{
  351. client: &http.Client{Timeout: timeout},
  352. }
  353. }
  354. // Call implements APIAdapter.
  355. func (a *DefaultAPIAdapter) Call(ctx context.Context, apiDef *APIDefinition, params map[string]any) (map[string]any, error) {
  356. // 1. Substitute {placeholder} path segments — values are URL-path-encoded.
  357. rawURL := apiDef.URL
  358. if pathParams, ok := params["pathParams"].(map[string]any); ok {
  359. for key, val := range pathParams {
  360. rawURL = strings.ReplaceAll(rawURL, "{"+key+"}", neturl.PathEscape(fmt.Sprintf("%v", val)))
  361. }
  362. }
  363. // 2. Append query string.
  364. if queryParams, ok := params["query"].(map[string]any); ok {
  365. q := neturl.Values{}
  366. for key, val := range queryParams {
  367. q.Set(key, fmt.Sprintf("%v", val))
  368. }
  369. if len(q) > 0 {
  370. rawURL += "?" + q.Encode()
  371. }
  372. }
  373. // 3. Marshal request body.
  374. var bodyReader io.Reader
  375. var hasBody bool
  376. if body, ok := params["body"]; ok && body != nil {
  377. bodyBytes, err := json.Marshal(body)
  378. if err != nil {
  379. return nil, fmt.Errorf("failed to marshal request body: %w", err)
  380. }
  381. bodyReader = bytes.NewReader(bodyBytes)
  382. hasBody = true
  383. }
  384. // 4. Build request bound to the workflow context so cancellation propagates.
  385. req, err := http.NewRequestWithContext(ctx, strings.ToUpper(apiDef.Method), rawURL, bodyReader)
  386. if err != nil {
  387. return nil, fmt.Errorf("failed to create HTTP request: %w", err)
  388. }
  389. // 5. Apply static headers from the API definition first.
  390. for key, val := range apiDef.Headers {
  391. req.Header.Set(key, val)
  392. }
  393. // 6. Per-call headers override static ones.
  394. if headers, ok := params["headers"].(map[string]any); ok {
  395. for key, val := range headers {
  396. req.Header.Set(key, fmt.Sprintf("%v", val))
  397. }
  398. }
  399. // 7. Set Content-Type when a body is present (unless caller already set it).
  400. if hasBody && req.Header.Get("Content-Type") == "" {
  401. req.Header.Set("Content-Type", "application/json")
  402. }
  403. // 8. Authorization header — token resolved by the executor from apiDef.Auth.
  404. if authToken, ok := params["authToken"].(string); ok && authToken != "" {
  405. req.Header.Set("Authorization", "Bearer "+authToken)
  406. }
  407. // 9. Execute request.
  408. resp, err := a.client.Do(req)
  409. if err != nil {
  410. return nil, fmt.Errorf("HTTP request failed: %w", err)
  411. }
  412. defer resp.Body.Close()
  413. // 10. Read full response body.
  414. respBody, err := io.ReadAll(resp.Body)
  415. if err != nil {
  416. return nil, fmt.Errorf("failed to read response body: %w", err)
  417. }
  418. // 11. Non-2xx → error with status and body for caller inspection.
  419. if resp.StatusCode < 200 || resp.StatusCode >= 300 {
  420. return nil, fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(respBody))
  421. }
  422. // 12. Build result: start with _status so callers can read it from _result._status.
  423. result := map[string]any{
  424. "_status": resp.StatusCode,
  425. }
  426. if len(respBody) == 0 {
  427. return result, nil
  428. }
  429. // 13. Merge JSON fields into result; fall back to raw "body" string for non-JSON.
  430. var parsed map[string]any
  431. if err := json.Unmarshal(respBody, &parsed); err != nil {
  432. result["body"] = string(respBody)
  433. } else {
  434. maps.Copy(result, parsed)
  435. }
  436. return result, nil
  437. }