main_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603
  1. package main
  2. // main_test.go — HTTP server integration tests for the workflow engine.
  3. //
  4. // All tests spin up an in-process httptest.Server via serverRunHandler and
  5. // communicate exclusively through the real POST /run + SSE event-stream
  6. // interface. No engine internals are accessed directly.
  7. //
  8. // ── Test Groups ───────────────────────────────────────────────────────────────
  9. //
  10. // TestIntegration_Offline pure-logic workflows, no LLM required, always run.
  11. // TestIntegration_Live LLM workflows, skipped unless LLM_KEY is set.
  12. // TestLiveServer_LLMNode original smoke test kept for backward compatibility.
  13. //
  14. // ── Coverage Map ──────────────────────────────────────────────────────────────
  15. //
  16. // From Downloads test suite (15 JSON fixtures in cmd/testdata/test*.json):
  17. // test02 Set + Branch (3 param variants)
  18. // test06 Write_* file overwrite + append
  19. // test07 Step.if conditional skip (2 variants)
  20. // test08 Nested Loop + Branch (pure logic)
  21. // test11 Expression arithmetic + Branch (3 variants)
  22. // test12 Parallel Loop + Write_* + .length (previously failing)
  23. // test14 step_print events
  24. // test01 Basic LLM call [live]
  25. // test03 Parallel children fan-out (2 LLM) [live]
  26. // test04 Serial Loop (3 LLM) [live]
  27. // test05 Parallel Loop (4 LLM) [live]
  28. // test09 Structured JSON output via output_config [live]
  29. // test10 Complex multi-stage pipeline (plan→loop→report→write) [live]
  30. // test13 Deep nesting Loop+Branch routing to 2 LLM steps [live]
  31. // test15 LLM out writes file + variable simultaneously [live]
  32. //
  33. // From engine integration suite (TC-01~16, cmd/testdata/tc*.json):
  34. // tc03 Branch no-match + no ELSE continues to Branch.next
  35. // tc05 Noop parallel children fan-out
  36. // tc07 Step onError handler fires on Write_* path error
  37. // tc08 Registry param default value auto-applied when caller omits
  38. // tc09 Multi-entry nodes both start in parallel
  39. // tc_pause_timeout Pause_* with 1 s timeout auto-routes to handler
  40. // tc16 Workflow runs successfully with custom RunParams fields
  41. //
  42. // Not converted (TC-10, TC-15: require /resume HTTP endpoint;
  43. // TC-13: requires custom ComponentAdapter injection):
  44. // these scenarios remain documented in the fixture JSONs for future work.
  45. //
  46. // ── Run commands ─────────────────────────────────────────────────────────────
  47. //
  48. // go test ./cmd/ -run TestIntegration_Offline -v -timeout 30s
  49. // WORKFLOW_CONFIG=/path/.llm_config go test ./cmd/ -run TestIntegration_Live -v -timeout 180s
  50. // WORKFLOW_CONFIG=/path/.llm_config go test ./cmd/ -run TestLiveServer_LLMNode -v -timeout 60s
  51. import (
  52. "bufio"
  53. "encoding/json"
  54. "net/http"
  55. "net/http/httptest"
  56. "os"
  57. "path/filepath"
  58. "strings"
  59. "testing"
  60. "workflow/config"
  61. )
  62. // ─────────────────────────────────────────────────────────────────────────────
  63. // SSE helpers
  64. // ─────────────────────────────────────────────────────────────────────────────
  65. // (sseEvent is defined in main.go — same package, no redeclaration needed)
  66. // runFixture loads a workflow JSON from cmd/testdata/<fixtureName>, POSTs it
  67. // to srv with the given params, reads the SSE stream until workflow_done or
  68. // workflow_failed, and returns all received events.
  69. func runFixture(t *testing.T, srv *httptest.Server, fixtureName string, params map[string]interface{}) []sseEvent {
  70. t.Helper()
  71. data, err := os.ReadFile(filepath.Join("testdata", fixtureName))
  72. if err != nil {
  73. t.Fatalf("load fixture %s: %v", fixtureName, err)
  74. }
  75. var wfDef map[string]interface{}
  76. if err := json.Unmarshal(data, &wfDef); err != nil {
  77. t.Fatalf("parse fixture %s: %v", fixtureName, err)
  78. }
  79. if params == nil {
  80. params = map[string]interface{}{}
  81. }
  82. reqBody := map[string]interface{}{
  83. "workflowDef": wfDef,
  84. "runParams": map[string]interface{}{
  85. "params": params,
  86. "workspaceId": "test",
  87. "mode": "create",
  88. },
  89. }
  90. reqBytes, _ := json.Marshal(reqBody)
  91. resp, err := http.Post(srv.URL+"/run", "application/json", strings.NewReader(string(reqBytes)))
  92. if err != nil {
  93. t.Fatalf("POST /run: %v", err)
  94. }
  95. defer resp.Body.Close()
  96. if resp.StatusCode != http.StatusOK {
  97. t.Fatalf("HTTP status %d (want 200)", resp.StatusCode)
  98. }
  99. var events []sseEvent
  100. scanner := bufio.NewScanner(resp.Body)
  101. for scanner.Scan() {
  102. line := scanner.Text()
  103. if !strings.HasPrefix(line, "data: ") {
  104. continue
  105. }
  106. var ev sseEvent
  107. if err := json.Unmarshal([]byte(line[6:]), &ev); err != nil {
  108. continue
  109. }
  110. sid := "<nil>"
  111. if ev.StepID != nil {
  112. sid = *ev.StepID
  113. }
  114. t.Logf("event #%d type=%-22s step=%s", ev.Seq, ev.Type, sid)
  115. events = append(events, ev)
  116. if ev.Type == "workflow_done" || ev.Type == "workflow_failed" {
  117. break
  118. }
  119. }
  120. if err := scanner.Err(); err != nil {
  121. t.Fatalf("SSE read error: %v", err)
  122. }
  123. return events
  124. }
  125. // ─────────────────────────────────────────────────────────────────────────────
  126. // Assertion helpers
  127. // ─────────────────────────────────────────────────────────────────────────────
  128. // assertWorkflowDone fails unless the final SSE event is workflow_done.
  129. func assertWorkflowDone(t *testing.T, events []sseEvent) {
  130. t.Helper()
  131. if len(events) == 0 {
  132. t.Fatal("no SSE events received")
  133. }
  134. last := events[len(events)-1]
  135. if last.Type == "workflow_failed" {
  136. t.Fatalf("workflow_failed: payload=%v", last.Payload)
  137. }
  138. if last.Type != "workflow_done" {
  139. t.Errorf("last event type=%q, want workflow_done", last.Type)
  140. }
  141. }
  142. // assertStepStarted fails unless at least one step_start event for stepID exists.
  143. func assertStepStarted(t *testing.T, events []sseEvent, stepID string) {
  144. t.Helper()
  145. for _, ev := range events {
  146. if ev.Type == "step_start" && ev.StepID != nil && *ev.StepID == stepID {
  147. return
  148. }
  149. }
  150. t.Errorf("step_start for %q not found in events", stepID)
  151. }
  152. // assertStepSkipped fails unless a step_skipped event for stepID exists.
  153. func assertStepSkipped(t *testing.T, events []sseEvent, stepID string) {
  154. t.Helper()
  155. for _, ev := range events {
  156. if ev.Type == "step_skipped" && ev.StepID != nil && *ev.StepID == stepID {
  157. return
  158. }
  159. }
  160. t.Errorf("step_skipped for %q not found in events", stepID)
  161. }
  162. // assertHasEventType fails unless at least one event of the given type exists.
  163. func assertHasEventType(t *testing.T, events []sseEvent, eventType string) {
  164. t.Helper()
  165. for _, ev := range events {
  166. if ev.Type == eventType {
  167. return
  168. }
  169. }
  170. t.Errorf("event type %q not found in %d events", eventType, len(events))
  171. }
  172. // assertPrintCount fails unless exactly want step_print events were received.
  173. func assertPrintCount(t *testing.T, events []sseEvent, want int) {
  174. t.Helper()
  175. got := 0
  176. for _, ev := range events {
  177. if ev.Type == "step_print" {
  178. got++
  179. }
  180. }
  181. if got != want {
  182. t.Errorf("step_print count=%d, want %d", got, want)
  183. }
  184. }
  185. // assertLLMDoneCount fails unless exactly want llm_done events were received.
  186. func assertLLMDoneCount(t *testing.T, events []sseEvent, want int) {
  187. t.Helper()
  188. got := 0
  189. for _, ev := range events {
  190. if ev.Type == "llm_done" {
  191. got++
  192. }
  193. }
  194. if got != want {
  195. t.Errorf("llm_done count=%d, want %d", got, want)
  196. }
  197. }
  198. // ─────────────────────────────────────────────────────────────────────────────
  199. // TestIntegration_Offline — pure-logic tests, no LLM, always runnable
  200. // ─────────────────────────────────────────────────────────────────────────────
  201. func TestIntegration_Offline(t *testing.T) {
  202. cfg := config.Load("")
  203. cfg.Workspace.Root = t.TempDir()
  204. srv := httptest.NewServer(serverRunHandler(cfg))
  205. t.Cleanup(srv.Close)
  206. // ── From Downloads test suite ─────────────────────────────────────────
  207. // test02: Set + Branch — three score ranges map to distinct steps
  208. t.Run("test02a_branch_excellent", func(t *testing.T) {
  209. events := runFixture(t, srv, "test02_set_and_branch.json",
  210. map[string]interface{}{"score": 95})
  211. assertWorkflowDone(t, events)
  212. assertStepStarted(t, events, "Set_Excellent")
  213. })
  214. t.Run("test02b_branch_pass", func(t *testing.T) {
  215. events := runFixture(t, srv, "test02_set_and_branch.json",
  216. map[string]interface{}{"score": 75})
  217. assertWorkflowDone(t, events)
  218. assertStepStarted(t, events, "Set_Pass")
  219. })
  220. t.Run("test02c_branch_fail", func(t *testing.T) {
  221. events := runFixture(t, srv, "test02_set_and_branch.json",
  222. map[string]interface{}{"score": 40})
  223. assertWorkflowDone(t, events)
  224. assertStepStarted(t, events, "Set_Fail")
  225. })
  226. // test06: Write_* file operations — overwrite then append
  227. t.Run("test06_write_file", func(t *testing.T) {
  228. events := runFixture(t, srv, "test06_write_file.json",
  229. map[string]interface{}{"content": "hello test06"})
  230. assertWorkflowDone(t, events)
  231. })
  232. // test07: Step.if conditional skip — two param variants
  233. t.Run("test07a_if_executed", func(t *testing.T) {
  234. events := runFixture(t, srv, "test07_if_conditional.json",
  235. map[string]interface{}{"shouldRun": "yes"})
  236. assertWorkflowDone(t, events)
  237. assertStepStarted(t, events, "Set_Conditional")
  238. assertStepSkipped(t, events, "Set_Skipped")
  239. })
  240. t.Run("test07b_if_skipped", func(t *testing.T) {
  241. events := runFixture(t, srv, "test07_if_conditional.json",
  242. map[string]interface{}{"shouldRun": "no"})
  243. assertWorkflowDone(t, events)
  244. assertStepSkipped(t, events, "Set_Conditional")
  245. assertStepStarted(t, events, "Set_Skipped")
  246. })
  247. // test08: Nested Loop + Branch — pure logic, no LLM
  248. t.Run("test08_nested_loop_branch", func(t *testing.T) {
  249. events := runFixture(t, srv, "test08_nested_loop_branch.json", nil)
  250. assertWorkflowDone(t, events)
  251. })
  252. // test11: Expression arithmetic + Branch — three comparison outcomes
  253. t.Run("test11a_expr_a_greater", func(t *testing.T) {
  254. events := runFixture(t, srv, "test11_expression_calc.json",
  255. map[string]interface{}{"a": 10, "b": 5})
  256. assertWorkflowDone(t, events)
  257. assertStepStarted(t, events, "Set_AGreater")
  258. })
  259. t.Run("test11b_expr_equal", func(t *testing.T) {
  260. events := runFixture(t, srv, "test11_expression_calc.json",
  261. map[string]interface{}{"a": 7, "b": 7})
  262. assertWorkflowDone(t, events)
  263. assertStepStarted(t, events, "Set_Equal")
  264. })
  265. t.Run("test11c_expr_b_greater", func(t *testing.T) {
  266. events := runFixture(t, srv, "test11_expression_calc.json",
  267. map[string]interface{}{"a": 3, "b": 8})
  268. assertWorkflowDone(t, events)
  269. assertStepStarted(t, events, "Set_BGreater")
  270. })
  271. // test12: Parallel loop + Write_* + .length (previously failing before fix)
  272. t.Run("test12_array_length", func(t *testing.T) {
  273. events := runFixture(t, srv, "test12_multi_write_loop.json", nil)
  274. assertWorkflowDone(t, events)
  275. })
  276. // test14: step_print events — three print steps expected
  277. t.Run("test14_print_events", func(t *testing.T) {
  278. events := runFixture(t, srv, "test14_print_events.json", nil)
  279. assertWorkflowDone(t, events)
  280. assertPrintCount(t, events, 3)
  281. })
  282. // ── From engine TC suite (new coverage not in Downloads set) ─────────
  283. // tc03: Branch with no matching case and no ELSE continues to Branch.next
  284. // (v3.15 fix: previously caused an error instead of falling through)
  285. t.Run("tc03_branch_no_match", func(t *testing.T) {
  286. events := runFixture(t, srv, "tc03_branch_no_match.json",
  287. map[string]interface{}{"score": 40})
  288. assertWorkflowDone(t, events)
  289. assertStepStarted(t, events, "Set_default")
  290. })
  291. // tc05: Noop_* node runs children as parallel sub-chains
  292. t.Run("tc05_noop_parallel", func(t *testing.T) {
  293. events := runFixture(t, srv, "tc05_noop_parallel.json", nil)
  294. assertWorkflowDone(t, events)
  295. assertStepStarted(t, events, "Set_left")
  296. assertStepStarted(t, events, "Set_right")
  297. })
  298. // tc07: Step onError handler fires when Write_* targets an undeclared artifact path
  299. t.Run("tc07_onerror_recovery", func(t *testing.T) {
  300. events := runFixture(t, srv, "tc07_onerror_recovery.json", nil)
  301. assertWorkflowDone(t, events)
  302. assertHasEventType(t, events, "step_error")
  303. assertStepStarted(t, events, "Set_fallback")
  304. })
  305. // tc08: Registry param with default value is auto-applied when caller omits it
  306. t.Run("tc08_param_default", func(t *testing.T) {
  307. events := runFixture(t, srv, "tc08_param_default.json", nil) // omit timeout
  308. assertWorkflowDone(t, events)
  309. assertStepStarted(t, events, "Set_ok") // Branch matched timeout==30
  310. })
  311. // tc09: Two unreferenced steps are both detected as entry nodes and
  312. // launched in parallel; both chains must complete
  313. t.Run("tc09_multi_entry", func(t *testing.T) {
  314. events := runFixture(t, srv, "tc09_multi_entry.json", nil)
  315. assertWorkflowDone(t, events)
  316. assertStepStarted(t, events, "Set_a")
  317. assertStepStarted(t, events, "Set_b")
  318. })
  319. // tc_pause_timeout: Pause_* with a 1 s timeout auto-routes to the timeout
  320. // handler without any external Resume call
  321. t.Run("tc_pause_timeout", func(t *testing.T) {
  322. events := runFixture(t, srv, "tc_pause_timeout.json", nil)
  323. assertWorkflowDone(t, events)
  324. assertHasEventType(t, events, "pause_start")
  325. assertHasEventType(t, events, "pause_timeout")
  326. assertStepStarted(t, events, "Set_timedOut")
  327. })
  328. // tc16: Workflow runs correctly when non-default RunParams fields
  329. // (workspaceId, mode) are supplied — engine must not reject them
  330. t.Run("tc16_run_params", func(t *testing.T) {
  331. events := runFixture(t, srv, "tc16_run_params.json", nil)
  332. assertWorkflowDone(t, events)
  333. })
  334. // ── v3.16 feature tests ──────────────────────────────────────────────
  335. // tc_while_loop: while loop counts from 0 to 4 using Set_* increment
  336. t.Run("tc_while_loop", func(t *testing.T) {
  337. events := runFixture(t, srv, "tc_while_loop.json", nil)
  338. assertWorkflowDone(t, events)
  339. // Loop body should fire 5 times (counter 0..4)
  340. assertStepStarted(t, events, "Set_Log")
  341. assertStepStarted(t, events, "Set_Inc")
  342. })
  343. // tc_break_serial: BREAK exits serial loop early when item==3
  344. t.Run("tc_break_serial", func(t *testing.T) {
  345. events := runFixture(t, srv, "tc_break_serial.json", nil)
  346. assertWorkflowDone(t, events)
  347. assertStepStarted(t, events, "Branch_Check")
  348. assertStepStarted(t, events, "Set_BreakHere")
  349. })
  350. // tc_source_maxiter: source loop with 5 items capped at maxIterations=3
  351. t.Run("tc_source_maxiter", func(t *testing.T) {
  352. events := runFixture(t, srv, "tc_source_maxiter.json", nil)
  353. assertWorkflowDone(t, events)
  354. assertStepStarted(t, events, "Set_Collect")
  355. })
  356. }
  357. // ─────────────────────────────────────────────────────────────────────────────
  358. // TestIntegration_Live — LLM workflows, skipped unless LLM_KEY is set
  359. // ─────────────────────────────────────────────────────────────────────────────
  360. func TestIntegration_Live(t *testing.T) {
  361. cfg := config.Load("")
  362. if cfg.LLM.Key == "" {
  363. t.Skip("LLM_KEY not set — skipping live LLM integration tests")
  364. }
  365. cfg.Workspace.Root = t.TempDir()
  366. srv := httptest.NewServer(serverRunHandler(cfg))
  367. t.Cleanup(srv.Close)
  368. // test01: basic single LLM call — verifies LLM step executes and emits llm_done
  369. t.Run("test01_basic_llm", func(t *testing.T) {
  370. events := runFixture(t, srv, "test01_basic_llm.json",
  371. map[string]interface{}{"prompt": "Reply with exactly one word: hello"})
  372. assertWorkflowDone(t, events)
  373. assertLLMDoneCount(t, events, 1)
  374. })
  375. // test03: parallel children fan-out — two LLM steps run concurrently
  376. t.Run("test03_parallel_children", func(t *testing.T) {
  377. events := runFixture(t, srv, "test03_parallel_children.json",
  378. map[string]interface{}{"topic": "cats"})
  379. assertWorkflowDone(t, events)
  380. assertLLMDoneCount(t, events, 2)
  381. })
  382. // test04: serial Loop — three sequential LLM calls (one per item)
  383. t.Run("test04_loop_serial", func(t *testing.T) {
  384. events := runFixture(t, srv, "test04_loop_serial.json", nil)
  385. assertWorkflowDone(t, events)
  386. assertLLMDoneCount(t, events, 3)
  387. })
  388. // test05: parallel Loop — four concurrent LLM calls
  389. t.Run("test05_loop_parallel", func(t *testing.T) {
  390. events := runFixture(t, srv, "test05_loop_parallel.json", nil)
  391. assertWorkflowDone(t, events)
  392. assertLLMDoneCount(t, events, 4)
  393. })
  394. // test09: output_config json_schema — LLM must return parseable JSON
  395. // (previously failing before AnthropicAdapter structured output fix)
  396. t.Run("test09_structured_output", func(t *testing.T) {
  397. events := runFixture(t, srv, "test09_structured_output.json",
  398. map[string]interface{}{"topic": "artificial intelligence"})
  399. assertWorkflowDone(t, events)
  400. assertLLMDoneCount(t, events, 1)
  401. })
  402. // test10: multi-stage pipeline — plan (json_schema) → parallel impl loop → report → Write_*
  403. t.Run("test10_complex_pipeline", func(t *testing.T) {
  404. events := runFixture(t, srv, "test10_complex_pipeline.json",
  405. map[string]interface{}{"requirement": "Build a simple todo app with add, list, and delete features"})
  406. assertWorkflowDone(t, events)
  407. })
  408. // test13: deep nesting — serial Loop → Branch → two different LLM steps (3 total)
  409. t.Run("test13_deep_nesting", func(t *testing.T) {
  410. events := runFixture(t, srv, "test13_deep_nesting.json", nil)
  411. assertWorkflowDone(t, events)
  412. assertLLMDoneCount(t, events, 3)
  413. })
  414. // test15: LLM out simultaneously writes a variable AND a file path
  415. t.Run("test15_out_file_write", func(t *testing.T) {
  416. events := runFixture(t, srv, "test15_out_file_write.json",
  417. map[string]interface{}{"topic": "ocean"})
  418. assertWorkflowDone(t, events)
  419. assertLLMDoneCount(t, events, 1)
  420. })
  421. }
  422. // ─────────────────────────────────────────────────────────────────────────────
  423. // TestLiveServer_LLMNode — original smoke test, kept for backward compatibility
  424. // Verifies SSE framing: run_id consistency, sequential seq numbers, llm_done.
  425. // ─────────────────────────────────────────────────────────────────────────────
  426. const simpleOneLLMWorkflow = `{
  427. "version": "3.15",
  428. "name": "LiveLLMTest",
  429. "registry": {
  430. "params": ["prompt(STRING)"]
  431. },
  432. "steps": [
  433. {
  434. "id": "LLM_answer",
  435. "in": {
  436. "messages": [
  437. {"role": "user", "content": "=prompt"}
  438. ]
  439. },
  440. "out": {"$reply": "=_result"},
  441. "next": "Stop_end"
  442. },
  443. {"id": "Stop_end"}
  444. ]
  445. }`
  446. func TestLiveServer_LLMNode(t *testing.T) {
  447. cfg := config.Load("")
  448. if cfg.LLM.Key == "" {
  449. t.Skip("LLM_KEY not set — skipping live LLM test")
  450. }
  451. srv := httptest.NewServer(serverRunHandler(cfg))
  452. t.Cleanup(srv.Close)
  453. var wfDef map[string]interface{}
  454. if err := json.Unmarshal([]byte(simpleOneLLMWorkflow), &wfDef); err != nil {
  455. t.Fatalf("parse workflow fixture: %v", err)
  456. }
  457. reqBody := map[string]interface{}{
  458. "workflowDef": wfDef,
  459. "runParams": map[string]interface{}{
  460. "params": map[string]interface{}{"prompt": "Reply with exactly one word: hello"},
  461. "workspaceId": "test-live",
  462. "mode": "create",
  463. },
  464. }
  465. reqBytes, _ := json.Marshal(reqBody)
  466. resp, err := http.Post(srv.URL+"/run", "application/json",
  467. strings.NewReader(string(reqBytes)))
  468. if err != nil {
  469. t.Fatalf("POST /run: %v", err)
  470. }
  471. defer resp.Body.Close()
  472. if resp.StatusCode != http.StatusOK {
  473. t.Fatalf("HTTP status %d (want 200)", resp.StatusCode)
  474. }
  475. if ct := resp.Header.Get("Content-Type"); !strings.HasPrefix(ct, "text/event-stream") {
  476. t.Errorf("Content-Type = %q, want text/event-stream", ct)
  477. }
  478. var events []sseEvent
  479. scanner := bufio.NewScanner(resp.Body)
  480. for scanner.Scan() {
  481. line := scanner.Text()
  482. if !strings.HasPrefix(line, "data: ") {
  483. continue
  484. }
  485. var ev sseEvent
  486. if err := json.Unmarshal([]byte(line[6:]), &ev); err != nil {
  487. t.Logf("skip unparseable event: %s", line)
  488. continue
  489. }
  490. sid := "<nil>"
  491. if ev.StepID != nil {
  492. sid = *ev.StepID
  493. }
  494. t.Logf("event #%d type=%-20s step=%s", ev.Seq, ev.Type, sid)
  495. events = append(events, ev)
  496. if ev.Type == "workflow_done" || ev.Type == "workflow_failed" {
  497. break
  498. }
  499. }
  500. if err := scanner.Err(); err != nil {
  501. t.Fatalf("SSE read error: %v", err)
  502. }
  503. if len(events) == 0 {
  504. t.Fatal("no SSE events received")
  505. }
  506. last := events[len(events)-1]
  507. if last.Type == "workflow_failed" {
  508. t.Fatalf("workflow_failed: payload=%v", last.Payload)
  509. }
  510. if last.Type != "workflow_done" {
  511. t.Errorf("last event type=%q, want workflow_done", last.Type)
  512. }
  513. // Verify SSE framing: all events share the same run_id
  514. runID := events[0].RunID
  515. for i, ev := range events {
  516. if ev.RunID != runID {
  517. t.Errorf("event #%d run_id=%q, want %q", i, ev.RunID, runID)
  518. }
  519. }
  520. // Verify seq is strictly 1-based and sequential
  521. for i, ev := range events {
  522. if ev.Seq != i+1 {
  523. t.Errorf("event #%d seq=%d, want %d", i, ev.Seq, i+1)
  524. }
  525. }
  526. // Verify llm_done event with latency/model metadata
  527. var sawLLMDone bool
  528. for _, ev := range events {
  529. if ev.Type == "llm_done" {
  530. sawLLMDone = true
  531. if ms, ok := ev.Payload["latency_ms"].(float64); ok {
  532. t.Logf("LLM latency: %.0fms", ms)
  533. }
  534. if model, ok := ev.Payload["model"].(string); ok {
  535. t.Logf("LLM model: %s", model)
  536. }
  537. }
  538. }
  539. if !sawLLMDone {
  540. t.Error("no llm_done event received")
  541. }
  542. }