main.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525
  1. // workflow — local runner for the Workflow Engine
  2. //
  3. // Usage:
  4. //
  5. // workflow server [--port 8080] [--config .env]
  6. // workflow run -workflow FILE -workspace-id ID [-params JSON] [-mode MODE] [-v]
  7. // workflow help
  8. //
  9. // Config (.env in working directory, or --config flag):
  10. //
  11. // LLM_URL=http://localhost:4000
  12. // LLM_KEY=sk-xxx
  13. // LLM_MODEL=gpt-4o
  14. // WORKSPACE_ROOT=./workspace
  15. // PORT=8080
  16. package main
  17. import (
  18. "context"
  19. "encoding/json"
  20. "flag"
  21. "fmt"
  22. "log"
  23. "net/http"
  24. "os"
  25. "strings"
  26. "time"
  27. "workflow"
  28. "workflow/config"
  29. )
  30. func main() {
  31. if len(os.Args) < 2 {
  32. printHelp()
  33. os.Exit(1)
  34. }
  35. switch os.Args[1] {
  36. case "server":
  37. cmdServer(os.Args[2:])
  38. case "run":
  39. cmdRun(os.Args[2:])
  40. case "help", "--help", "-h":
  41. printHelp()
  42. default:
  43. fmt.Fprintf(os.Stderr, "unknown command: %q\n\n", os.Args[1])
  44. printHelp()
  45. os.Exit(1)
  46. }
  47. }
  48. func printHelp() {
  49. fmt.Print(`workflow — Workflow Engine local runner
  50. Commands:
  51. server Start local HTTP server (POST /run → SSE event stream)
  52. run Execute a workflow file directly in the terminal
  53. help Show this help
  54. Server usage:
  55. workflow server [--port 8080] [--config .env]
  56. Run usage:
  57. workflow run -workflow FILE -workspace-id ID [options]
  58. -workflow Workflow JSON file (required)
  59. -workspace-id Project identifier / gid (required)
  60. -params Input params as JSON: '{"userRequest":"..."}'
  61. -mode create | patch | regenerate | validate (default: create)
  62. -config Config file path (default: .env in CWD)
  63. -v Verbose: print all events
  64. Config file (.env or --config path):
  65. LLM_URL=http://localhost:4000
  66. LLM_KEY=sk-xxx
  67. LLM_MODEL=gpt-4o
  68. WORKSPACE_ROOT=./workspace
  69. PORT=8080
  70. `)
  71. }
  72. // ─────────────────────────────────────────────────────────────────────────────
  73. // server subcommand
  74. // ─────────────────────────────────────────────────────────────────────────────
  75. func cmdServer(args []string) {
  76. fs := flag.NewFlagSet("server", flag.ExitOnError)
  77. cfgFile := fs.String("config", "", "Config file (default: .env in CWD)")
  78. fs.Usage = func() {
  79. fmt.Fprintln(os.Stderr, "Usage: workflow server [--config FILE]")
  80. fs.PrintDefaults()
  81. }
  82. fs.Parse(args)
  83. cfg := config.Load(*cfgFile)
  84. log.SetFlags(log.Ltime | log.Lmsgprefix)
  85. log.SetPrefix("workflow ")
  86. log.Printf("endpoint : POST http://localhost:%s/run", cfg.Server.Port)
  87. log.Printf("workspace : %s/<workspaceId>/", cfg.Workspace.Root)
  88. log.Printf("llm url : %s", cfg.LLM.URL)
  89. log.Printf("llm model : %s", cfg.LLM.Model)
  90. log.Printf("llm key : %s", config.MaskKey(cfg.LLM.Key))
  91. mux := http.NewServeMux()
  92. mux.HandleFunc("/run", serverRunHandler(cfg))
  93. mux.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) {
  94. w.Header().Set("Content-Type", "application/json")
  95. fmt.Fprintf(w, `{"status":"ok","ts":%q}`+"\n", nowTS())
  96. })
  97. srv := &http.Server{
  98. Addr: ":" + cfg.Server.Port,
  99. Handler: mux,
  100. ReadTimeout: 30 * time.Second,
  101. IdleTimeout: 120 * time.Second,
  102. // WriteTimeout intentionally 0 — SSE streams can be long
  103. }
  104. log.Printf("Listening on :%s", cfg.Server.Port)
  105. if err := srv.ListenAndServe(); err != nil {
  106. log.Fatal(err)
  107. }
  108. }
  109. // RunRequest is the JSON body for POST /run.
  110. type RunRequest struct {
  111. WorkflowID string `json:"workflowId"`
  112. WorkflowDef map[string]interface{} `json:"workflowDef"`
  113. WorkflowFile string `json:"workflowFile"`
  114. RunParams workflow.RunParams `json:"runParams"`
  115. }
  116. // sseEvent mirrors the spec §13.2 event shape.
  117. type sseEvent struct {
  118. RunID string `json:"run_id"`
  119. Seq int `json:"seq"`
  120. TS string `json:"ts"`
  121. Type string `json:"type"`
  122. StepID *string `json:"step_id"`
  123. Payload map[string]interface{} `json:"payload"`
  124. }
  125. func writeSSE(w http.ResponseWriter, f http.Flusher, ev sseEvent) {
  126. data, _ := json.Marshal(ev)
  127. fmt.Fprintf(w, "data: %s\n\n", data)
  128. f.Flush()
  129. }
  130. func serverRunHandler(cfg config.Config) http.HandlerFunc {
  131. return func(w http.ResponseWriter, r *http.Request) {
  132. if r.Method != http.MethodPost {
  133. http.Error(w, "POST /run only", http.StatusMethodNotAllowed)
  134. return
  135. }
  136. var req RunRequest
  137. if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
  138. http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
  139. return
  140. }
  141. wf, err := loadWorkflow(req)
  142. if err != nil {
  143. http.Error(w, err.Error(), http.StatusBadRequest)
  144. return
  145. }
  146. eng, err := workflow.NewEngine(wf)
  147. if err != nil {
  148. http.Error(w, "engine: "+err.Error(), http.StatusBadRequest)
  149. return
  150. }
  151. adapters, err := buildAdapters(cfg, req.RunParams.WorkspaceID)
  152. if err != nil {
  153. http.Error(w, err.Error(), http.StatusInternalServerError)
  154. return
  155. }
  156. w.Header().Set("Content-Type", "text/event-stream")
  157. w.Header().Set("Cache-Control", "no-cache")
  158. w.Header().Set("Connection", "keep-alive")
  159. w.Header().Set("X-Accel-Buffering", "no")
  160. flusher, ok := w.(http.Flusher)
  161. if !ok {
  162. http.Error(w, "streaming unsupported", http.StatusInternalServerError)
  163. return
  164. }
  165. runID := fmt.Sprintf("run_%d", time.Now().UnixMilli())
  166. label := req.WorkflowID
  167. if label == "" {
  168. label = wf.Name
  169. }
  170. log.Printf("[%s] start workflow=%s gid=%s mode=%s",
  171. runID, label, req.RunParams.WorkspaceID, req.RunParams.Mode)
  172. result, err := eng.ExecuteWithRunParams(r.Context(), nil, adapters, req.RunParams)
  173. if err != nil {
  174. writeSSE(w, flusher, sseEvent{
  175. RunID: runID, Seq: 1, TS: nowTS(), Type: "workflow_failed",
  176. Payload: map[string]interface{}{"error": err.Error()},
  177. })
  178. return
  179. }
  180. seq := 1
  181. for ev := range result.RunEventStream {
  182. payload := ev.Payload
  183. if payload == nil {
  184. payload = map[string]interface{}{}
  185. }
  186. writeSSE(w, flusher, sseEvent{
  187. RunID: runID, Seq: seq, TS: nowTS(),
  188. Type: string(ev.Type), StepID: ev.StepID, Payload: payload,
  189. })
  190. seq++
  191. }
  192. log.Printf("[%s] done status=%s events=%d", runID, result.Context.Status, seq-1)
  193. }
  194. }
  195. // ─────────────────────────────────────────────────────────────────────────────
  196. // run subcommand
  197. // ─────────────────────────────────────────────────────────────────────────────
  198. func cmdRun(args []string) {
  199. fs := flag.NewFlagSet("run", flag.ExitOnError)
  200. workflowFile := fs.String("workflow", "", "Workflow JSON file (required)")
  201. workspaceID := fs.String("workspace-id", envOr("WORKSPACE_ID", ""), "Project identifier / gid (required)")
  202. workspaceRoot := fs.String("workspace-root", "", "Override WORKSPACE_ROOT from config")
  203. paramsJSON := fs.String("params", "", `Input params JSON: '{"key":"value"}'`)
  204. mode := fs.String("mode", "create", "create|patch|regenerate|validate")
  205. cfgFile := fs.String("config", "", "Config file (default: .env in CWD)")
  206. verbose := fs.Bool("v", false, "Verbose: print all events")
  207. fs.Usage = func() {
  208. fmt.Fprintln(os.Stderr, "Usage: workflow run -workflow FILE -workspace-id ID [options]")
  209. fs.PrintDefaults()
  210. }
  211. fs.Parse(args)
  212. if *workflowFile == "" || *workspaceID == "" {
  213. fs.Usage()
  214. os.Exit(1)
  215. }
  216. cfg := config.Load(*cfgFile)
  217. if *workspaceRoot != "" {
  218. cfg.Workspace.Root = *workspaceRoot
  219. }
  220. wfBytes, err := os.ReadFile(*workflowFile)
  221. if err != nil {
  222. fatalf("read workflow: %v", err)
  223. }
  224. var wf workflow.Workflow
  225. if err := json.Unmarshal(wfBytes, &wf); err != nil {
  226. fatalf("parse workflow: %v", err)
  227. }
  228. var params map[string]interface{}
  229. if strings.TrimSpace(*paramsJSON) != "" {
  230. if err := json.Unmarshal([]byte(*paramsJSON), &params); err != nil {
  231. fatalf("parse -params: %v", err)
  232. }
  233. }
  234. fmt.Printf("▶ workflow : %s (v%s)\n", wf.Name, wf.Version)
  235. fmt.Printf(" workspace-id: %s\n", *workspaceID)
  236. fmt.Printf(" workspace : %s/%s\n", cfg.Workspace.Root, *workspaceID)
  237. fmt.Printf(" mode : %s\n", *mode)
  238. if len(params) > 0 {
  239. raw, _ := json.Marshal(params)
  240. fmt.Printf(" params : %s\n", raw)
  241. }
  242. fmt.Printf(" llm : %s model=%s\n\n", cfg.LLM.URL, cfg.LLM.Model)
  243. adapters, err := buildAdapters(cfg, *workspaceID)
  244. if err != nil {
  245. fatalf("adapters: %v", err)
  246. }
  247. eng, err := workflow.NewEngine(&wf)
  248. if err != nil {
  249. fatalf("engine: %v", err)
  250. }
  251. start := time.Now()
  252. result, err := eng.ExecuteWithRunParams(context.Background(), nil, adapters,
  253. workflow.RunParams{Params: params, WorkspaceID: *workspaceID, Mode: *mode})
  254. if err != nil {
  255. fatalf("execute: %v", err)
  256. }
  257. var steps, llms int
  258. for ev := range result.RunEventStream {
  259. switch ev.Type {
  260. case workflow.RunEventStepStart:
  261. if ev.StepID != nil {
  262. fmt.Printf(" → %-30s\n", *ev.StepID)
  263. }
  264. case workflow.RunEventStepDone:
  265. steps++
  266. case workflow.RunEventStepSkipped:
  267. if ev.StepID != nil {
  268. fmt.Printf(" ↷ %-30s (skipped)\n", *ev.StepID)
  269. }
  270. case workflow.RunEventStepError:
  271. id := ""
  272. if ev.StepID != nil {
  273. id = *ev.StepID
  274. }
  275. fmt.Printf(" ✗ %-30s ERROR: %v\n", id, ev.Payload["error"])
  276. case workflow.RunEventLLMDone:
  277. llms++
  278. id := ""
  279. if ev.StepID != nil {
  280. id = *ev.StepID
  281. }
  282. fmt.Printf(" llm_done %s %vms model=%v\n",
  283. id, ev.Payload["latency_ms"], ev.Payload["model"])
  284. if u, ok := ev.Payload["usage"].(map[string]interface{}); ok {
  285. fmt.Printf(" tokens in=%v out=%v total=%v\n",
  286. u["input_tokens"], u["output_tokens"], u["total_tokens"])
  287. }
  288. default:
  289. if *verbose {
  290. raw, _ := json.Marshal(ev.Payload)
  291. id := ""
  292. if ev.StepID != nil {
  293. id = *ev.StepID
  294. }
  295. fmt.Printf(" ~ %-20s %-25s %s\n", ev.Type, id, raw)
  296. }
  297. }
  298. }
  299. fmt.Printf("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n")
  300. status := result.Context.Status
  301. icon := "✓"
  302. if status == workflow.StatusFailed {
  303. icon = "✗"
  304. }
  305. fmt.Printf("%s %s steps=%d llm=%d %v\n",
  306. icon, status, steps, llms, time.Since(start).Round(time.Millisecond))
  307. if len(result.Context.Variables) > 0 {
  308. fmt.Println("\n── variables ──")
  309. for k, v := range result.Context.Variables {
  310. if s, ok := v.(string); ok {
  311. if len(s) > 120 {
  312. fmt.Printf(" %-20s = %q…\n", k, s[:120])
  313. } else {
  314. fmt.Printf(" %-20s = %q\n", k, s)
  315. }
  316. } else {
  317. raw, _ := json.MarshalIndent(v, " ", " ")
  318. fmt.Printf(" %-20s = %s\n", k, raw)
  319. }
  320. }
  321. }
  322. if len(result.Context.Artifacts) > 0 {
  323. fmt.Println("\n── artifacts ──")
  324. for p := range result.Context.Artifacts {
  325. fmt.Printf(" %s/%s%s\n", cfg.Workspace.Root, *workspaceID, p)
  326. }
  327. }
  328. fmt.Println()
  329. if status == workflow.StatusFailed {
  330. os.Exit(1)
  331. }
  332. }
  333. // ─────────────────────────────────────────────────────────────────────────────
  334. // shared helpers
  335. // ─────────────────────────────────────────────────────────────────────────────
  336. func loadWorkflow(req RunRequest) (*workflow.Workflow, error) {
  337. var wf workflow.Workflow
  338. switch {
  339. case req.WorkflowFile != "":
  340. data, err := os.ReadFile(req.WorkflowFile)
  341. if err != nil {
  342. return nil, fmt.Errorf("read workflowFile: %w", err)
  343. }
  344. if err := json.Unmarshal(data, &wf); err != nil {
  345. return nil, fmt.Errorf("parse workflowFile: %w", err)
  346. }
  347. case req.WorkflowDef != nil:
  348. raw, _ := json.Marshal(req.WorkflowDef)
  349. if err := json.Unmarshal(raw, &wf); err != nil {
  350. return nil, fmt.Errorf("parse workflowDef: %w", err)
  351. }
  352. default:
  353. return nil, fmt.Errorf("provide workflowDef or workflowFile")
  354. }
  355. return &wf, nil
  356. }
  357. func buildAdapters(cfg config.Config, workspaceID string) (*workflow.Adapters, error) {
  358. gid := workspaceID
  359. if gid == "" {
  360. gid = "default"
  361. }
  362. // v3.16+: Build multi-provider LLM adapter registry
  363. registry := buildLLMRegistry(cfg)
  364. file, err := workflow.NewLocalFileAdapter(cfg.Workspace.Root, gid)
  365. if err != nil {
  366. return nil, fmt.Errorf("file adapter: %w", err)
  367. }
  368. return &workflow.Adapters{
  369. Service: workflow.NewDefaultServiceAdapter(),
  370. Component: workflow.NewDefaultComponentAdapter(),
  371. LLM: registry, // LLMAdapterRegistry implements LLMAdapter
  372. LLMAdapterRegistry: registry,
  373. File: file,
  374. }, nil
  375. }
  376. // buildLLMRegistry creates a multi-provider LLM adapter registry (v3.16+).
  377. // It reads cfg.LLM (global default) and cfg.Providers (per-provider) to build
  378. // adapters for each known provider (openai, anthropic), then determines
  379. // the default adapter from LLM_MODEL or LLM_URL.
  380. func buildLLMRegistry(cfg config.Config) *workflow.LLMAdapterRegistry {
  381. // Determine default provider from LLM_MODEL (provider/modelId format) or LLM_URL
  382. defaultProvider := "openai"
  383. defaultModelId := cfg.LLM.Model
  384. if strings.Contains(cfg.LLM.Model, "/") {
  385. parts := strings.SplitN(cfg.LLM.Model, "/", 2)
  386. defaultProvider = parts[0]
  387. defaultModelId = parts[1]
  388. } else if strings.Contains(cfg.LLM.URL, "anthropic.com") {
  389. defaultProvider = "anthropic"
  390. }
  391. // Build OpenAI adapter
  392. openaiKey := cfg.LLM.Key // fallback
  393. openaiModel := defaultModelId
  394. openaiURL := cfg.LLM.URL
  395. if p, ok := cfg.Providers["openai"]; ok {
  396. if p.APIKey != "" {
  397. openaiKey = p.APIKey
  398. }
  399. if p.Model != "" {
  400. openaiModel = p.Model
  401. }
  402. if p.BaseURL != "" {
  403. openaiURL = p.BaseURL
  404. }
  405. }
  406. // Only use openaiModel as default if openai IS the default provider
  407. if defaultProvider != "openai" {
  408. if p, ok := cfg.Providers["openai"]; ok && p.Model != "" {
  409. openaiModel = p.Model
  410. } else {
  411. openaiModel = "" // no default model for non-default provider
  412. }
  413. }
  414. openaiAdapter := workflow.NewOpenAIAdapter(workflow.OpenAIConfig{
  415. BaseURL: openaiURL,
  416. APIKey: openaiKey,
  417. Model: openaiModel,
  418. })
  419. // Build Anthropic adapter
  420. anthropicKey := cfg.LLM.Key // fallback
  421. anthropicModel := ""
  422. anthropicURL := "https://api.anthropic.com"
  423. if p, ok := cfg.Providers["anthropic"]; ok {
  424. if p.APIKey != "" {
  425. anthropicKey = p.APIKey
  426. }
  427. if p.Model != "" {
  428. anthropicModel = p.Model
  429. }
  430. if p.BaseURL != "" {
  431. anthropicURL = p.BaseURL
  432. }
  433. }
  434. // Use defaultModelId if anthropic is the default provider and no provider-specific model
  435. if defaultProvider == "anthropic" && anthropicModel == "" {
  436. anthropicModel = defaultModelId
  437. }
  438. anthropicAdapter := workflow.NewAnthropicAdapter(workflow.AnthropicConfig{
  439. APIKey: anthropicKey,
  440. Model: anthropicModel,
  441. BaseURL: anthropicURL,
  442. })
  443. // Select default adapter based on determined provider
  444. var defaultAdapter workflow.LLMAdapter
  445. switch defaultProvider {
  446. case "anthropic":
  447. defaultAdapter = anthropicAdapter
  448. default:
  449. defaultAdapter = openaiAdapter
  450. }
  451. registry := workflow.NewLLMAdapterRegistry(defaultAdapter, defaultProvider)
  452. registry.Register("openai", openaiAdapter)
  453. registry.Register("anthropic", anthropicAdapter)
  454. return registry
  455. }
  456. func nowTS() string {
  457. return time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
  458. }
  459. func envOr(key, fallback string) string {
  460. if v := os.Getenv(key); v != "" {
  461. return v
  462. }
  463. return fallback
  464. }
  465. func fatalf(format string, args ...interface{}) {
  466. fmt.Fprintf(os.Stderr, "error: "+format+"\n", args...)
  467. os.Exit(1)
  468. }