package main // main_test.go — HTTP server integration tests for the workflow engine. // // All tests spin up an in-process httptest.Server via serverRunHandler and // communicate exclusively through the real POST /run + SSE event-stream // interface. No engine internals are accessed directly. // // ── Test Groups ─────────────────────────────────────────────────────────────── // // TestIntegration_Offline pure-logic workflows, no LLM required, always run. // TestIntegration_Live LLM workflows, skipped unless LLM_KEY is set. // TestLiveServer_LLMNode original smoke test kept for backward compatibility. // // ── Coverage Map ────────────────────────────────────────────────────────────── // // From Downloads test suite (15 JSON fixtures in cmd/testdata/test*.json): // test02 Set + Branch (3 param variants) // test06 Write_* file overwrite + append // test07 Step.if conditional skip (2 variants) // test08 Nested Loop + Branch (pure logic) // test11 Expression arithmetic + Branch (3 variants) // test12 Parallel Loop + Write_* + .length (previously failing) // test14 step_print events // test01 Basic LLM call [live] // test03 Parallel children fan-out (2 LLM) [live] // test04 Serial Loop (3 LLM) [live] // test05 Parallel Loop (4 LLM) [live] // test09 Structured JSON output via output_config [live] // test10 Complex multi-stage pipeline (plan→loop→report→write) [live] // test13 Deep nesting Loop+Branch routing to 2 LLM steps [live] // test15 LLM out writes file + variable simultaneously [live] // // From engine integration suite (TC-01~16, cmd/testdata/tc*.json): // tc03 Branch no-match + no ELSE continues to Branch.next // tc05 Noop parallel children fan-out // tc07 Step onError handler fires on Write_* path error // tc08 Registry param default value auto-applied when caller omits // tc09 Multi-entry nodes both start in parallel // tc_pause_timeout Pause_* with 1 s timeout auto-routes to handler // tc16 Workflow runs successfully with custom RunParams fields // // Not converted (TC-10, TC-15: require /resume HTTP endpoint; // TC-13: requires custom ComponentAdapter injection): // these scenarios remain documented in the fixture JSONs for future work. // // ── Run commands ───────────────────────────────────────────────────────────── // // go test ./cmd/ -run TestIntegration_Offline -v -timeout 30s // WORKFLOW_CONFIG=/path/.llm_config go test ./cmd/ -run TestIntegration_Live -v -timeout 180s // WORKFLOW_CONFIG=/path/.llm_config go test ./cmd/ -run TestLiveServer_LLMNode -v -timeout 60s import ( "bufio" "encoding/json" "net/http" "net/http/httptest" "os" "path/filepath" "strings" "testing" "workflow/config" ) // ───────────────────────────────────────────────────────────────────────────── // SSE helpers // ───────────────────────────────────────────────────────────────────────────── // (sseEvent is defined in main.go — same package, no redeclaration needed) // runFixture loads a workflow JSON from cmd/testdata/, POSTs it // to srv with the given params, reads the SSE stream until workflow_done or // workflow_failed, and returns all received events. func runFixture(t *testing.T, srv *httptest.Server, fixtureName string, params map[string]interface{}) []sseEvent { t.Helper() data, err := os.ReadFile(filepath.Join("testdata", fixtureName)) if err != nil { t.Fatalf("load fixture %s: %v", fixtureName, err) } var wfDef map[string]interface{} if err := json.Unmarshal(data, &wfDef); err != nil { t.Fatalf("parse fixture %s: %v", fixtureName, err) } if params == nil { params = map[string]interface{}{} } reqBody := map[string]interface{}{ "workflowDef": wfDef, "runParams": map[string]interface{}{ "params": params, "workspaceId": "test", "mode": "create", }, } reqBytes, _ := json.Marshal(reqBody) resp, err := http.Post(srv.URL+"/run", "application/json", strings.NewReader(string(reqBytes))) if err != nil { t.Fatalf("POST /run: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Fatalf("HTTP status %d (want 200)", resp.StatusCode) } var events []sseEvent scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { line := scanner.Text() if !strings.HasPrefix(line, "data: ") { continue } var ev sseEvent if err := json.Unmarshal([]byte(line[6:]), &ev); err != nil { continue } sid := "" if ev.StepID != nil { sid = *ev.StepID } t.Logf("event #%d type=%-22s step=%s", ev.Seq, ev.Type, sid) events = append(events, ev) if ev.Type == "workflow_done" || ev.Type == "workflow_failed" { break } } if err := scanner.Err(); err != nil { t.Fatalf("SSE read error: %v", err) } return events } // ───────────────────────────────────────────────────────────────────────────── // Assertion helpers // ───────────────────────────────────────────────────────────────────────────── // assertWorkflowDone fails unless the final SSE event is workflow_done. func assertWorkflowDone(t *testing.T, events []sseEvent) { t.Helper() if len(events) == 0 { t.Fatal("no SSE events received") } last := events[len(events)-1] if last.Type == "workflow_failed" { t.Fatalf("workflow_failed: payload=%v", last.Payload) } if last.Type != "workflow_done" { t.Errorf("last event type=%q, want workflow_done", last.Type) } } // assertStepStarted fails unless at least one step_start event for stepID exists. func assertStepStarted(t *testing.T, events []sseEvent, stepID string) { t.Helper() for _, ev := range events { if ev.Type == "step_start" && ev.StepID != nil && *ev.StepID == stepID { return } } t.Errorf("step_start for %q not found in events", stepID) } // assertStepSkipped fails unless a step_skipped event for stepID exists. func assertStepSkipped(t *testing.T, events []sseEvent, stepID string) { t.Helper() for _, ev := range events { if ev.Type == "step_skipped" && ev.StepID != nil && *ev.StepID == stepID { return } } t.Errorf("step_skipped for %q not found in events", stepID) } // assertHasEventType fails unless at least one event of the given type exists. func assertHasEventType(t *testing.T, events []sseEvent, eventType string) { t.Helper() for _, ev := range events { if ev.Type == eventType { return } } t.Errorf("event type %q not found in %d events", eventType, len(events)) } // assertPrintCount fails unless exactly want step_print events were received. func assertPrintCount(t *testing.T, events []sseEvent, want int) { t.Helper() got := 0 for _, ev := range events { if ev.Type == "step_print" { got++ } } if got != want { t.Errorf("step_print count=%d, want %d", got, want) } } // assertLLMDoneCount fails unless exactly want llm_done events were received. func assertLLMDoneCount(t *testing.T, events []sseEvent, want int) { t.Helper() got := 0 for _, ev := range events { if ev.Type == "llm_done" { got++ } } if got != want { t.Errorf("llm_done count=%d, want %d", got, want) } } // ───────────────────────────────────────────────────────────────────────────── // TestIntegration_Offline — pure-logic tests, no LLM, always runnable // ───────────────────────────────────────────────────────────────────────────── func TestIntegration_Offline(t *testing.T) { cfg := config.Load("") cfg.Workspace.Root = t.TempDir() srv := httptest.NewServer(serverRunHandler(cfg)) t.Cleanup(srv.Close) // ── From Downloads test suite ───────────────────────────────────────── // test02: Set + Branch — three score ranges map to distinct steps t.Run("test02a_branch_excellent", func(t *testing.T) { events := runFixture(t, srv, "test02_set_and_branch.json", map[string]interface{}{"score": 95}) assertWorkflowDone(t, events) assertStepStarted(t, events, "Set_Excellent") }) t.Run("test02b_branch_pass", func(t *testing.T) { events := runFixture(t, srv, "test02_set_and_branch.json", map[string]interface{}{"score": 75}) assertWorkflowDone(t, events) assertStepStarted(t, events, "Set_Pass") }) t.Run("test02c_branch_fail", func(t *testing.T) { events := runFixture(t, srv, "test02_set_and_branch.json", map[string]interface{}{"score": 40}) assertWorkflowDone(t, events) assertStepStarted(t, events, "Set_Fail") }) // test06: Write_* file operations — overwrite then append t.Run("test06_write_file", func(t *testing.T) { events := runFixture(t, srv, "test06_write_file.json", map[string]interface{}{"content": "hello test06"}) assertWorkflowDone(t, events) }) // test07: Step.if conditional skip — two param variants t.Run("test07a_if_executed", func(t *testing.T) { events := runFixture(t, srv, "test07_if_conditional.json", map[string]interface{}{"shouldRun": "yes"}) assertWorkflowDone(t, events) assertStepStarted(t, events, "Set_Conditional") assertStepSkipped(t, events, "Set_Skipped") }) t.Run("test07b_if_skipped", func(t *testing.T) { events := runFixture(t, srv, "test07_if_conditional.json", map[string]interface{}{"shouldRun": "no"}) assertWorkflowDone(t, events) assertStepSkipped(t, events, "Set_Conditional") assertStepStarted(t, events, "Set_Skipped") }) // test08: Nested Loop + Branch — pure logic, no LLM t.Run("test08_nested_loop_branch", func(t *testing.T) { events := runFixture(t, srv, "test08_nested_loop_branch.json", nil) assertWorkflowDone(t, events) }) // test11: Expression arithmetic + Branch — three comparison outcomes t.Run("test11a_expr_a_greater", func(t *testing.T) { events := runFixture(t, srv, "test11_expression_calc.json", map[string]interface{}{"a": 10, "b": 5}) assertWorkflowDone(t, events) assertStepStarted(t, events, "Set_AGreater") }) t.Run("test11b_expr_equal", func(t *testing.T) { events := runFixture(t, srv, "test11_expression_calc.json", map[string]interface{}{"a": 7, "b": 7}) assertWorkflowDone(t, events) assertStepStarted(t, events, "Set_Equal") }) t.Run("test11c_expr_b_greater", func(t *testing.T) { events := runFixture(t, srv, "test11_expression_calc.json", map[string]interface{}{"a": 3, "b": 8}) assertWorkflowDone(t, events) assertStepStarted(t, events, "Set_BGreater") }) // test12: Parallel loop + Write_* + .length (previously failing before fix) t.Run("test12_array_length", func(t *testing.T) { events := runFixture(t, srv, "test12_multi_write_loop.json", nil) assertWorkflowDone(t, events) }) // test14: step_print events — three print steps expected t.Run("test14_print_events", func(t *testing.T) { events := runFixture(t, srv, "test14_print_events.json", nil) assertWorkflowDone(t, events) assertPrintCount(t, events, 3) }) // ── From engine TC suite (new coverage not in Downloads set) ───────── // tc03: Branch with no matching case and no ELSE continues to Branch.next // (v3.15 fix: previously caused an error instead of falling through) t.Run("tc03_branch_no_match", func(t *testing.T) { events := runFixture(t, srv, "tc03_branch_no_match.json", map[string]interface{}{"score": 40}) assertWorkflowDone(t, events) assertStepStarted(t, events, "Set_default") }) // tc05: Noop_* node runs children as parallel sub-chains t.Run("tc05_noop_parallel", func(t *testing.T) { events := runFixture(t, srv, "tc05_noop_parallel.json", nil) assertWorkflowDone(t, events) assertStepStarted(t, events, "Set_left") assertStepStarted(t, events, "Set_right") }) // tc07: Step onError handler fires when Write_* targets an undeclared artifact path t.Run("tc07_onerror_recovery", func(t *testing.T) { events := runFixture(t, srv, "tc07_onerror_recovery.json", nil) assertWorkflowDone(t, events) assertHasEventType(t, events, "step_error") assertStepStarted(t, events, "Set_fallback") }) // tc08: Registry param with default value is auto-applied when caller omits it t.Run("tc08_param_default", func(t *testing.T) { events := runFixture(t, srv, "tc08_param_default.json", nil) // omit timeout assertWorkflowDone(t, events) assertStepStarted(t, events, "Set_ok") // Branch matched timeout==30 }) // tc09: Two unreferenced steps are both detected as entry nodes and // launched in parallel; both chains must complete t.Run("tc09_multi_entry", func(t *testing.T) { events := runFixture(t, srv, "tc09_multi_entry.json", nil) assertWorkflowDone(t, events) assertStepStarted(t, events, "Set_a") assertStepStarted(t, events, "Set_b") }) // tc_pause_timeout: Pause_* with a 1 s timeout auto-routes to the timeout // handler without any external Resume call t.Run("tc_pause_timeout", func(t *testing.T) { events := runFixture(t, srv, "tc_pause_timeout.json", nil) assertWorkflowDone(t, events) assertHasEventType(t, events, "pause_start") assertHasEventType(t, events, "pause_timeout") assertStepStarted(t, events, "Set_timedOut") }) // tc16: Workflow runs correctly when non-default RunParams fields // (workspaceId, mode) are supplied — engine must not reject them t.Run("tc16_run_params", func(t *testing.T) { events := runFixture(t, srv, "tc16_run_params.json", nil) assertWorkflowDone(t, events) }) // ── v3.16 feature tests ────────────────────────────────────────────── // tc_while_loop: while loop counts from 0 to 4 using Set_* increment t.Run("tc_while_loop", func(t *testing.T) { events := runFixture(t, srv, "tc_while_loop.json", nil) assertWorkflowDone(t, events) // Loop body should fire 5 times (counter 0..4) assertStepStarted(t, events, "Set_Log") assertStepStarted(t, events, "Set_Inc") }) // tc_break_serial: BREAK exits serial loop early when item==3 t.Run("tc_break_serial", func(t *testing.T) { events := runFixture(t, srv, "tc_break_serial.json", nil) assertWorkflowDone(t, events) assertStepStarted(t, events, "Branch_Check") assertStepStarted(t, events, "Set_BreakHere") }) // tc_source_maxiter: source loop with 5 items capped at maxIterations=3 t.Run("tc_source_maxiter", func(t *testing.T) { events := runFixture(t, srv, "tc_source_maxiter.json", nil) assertWorkflowDone(t, events) assertStepStarted(t, events, "Set_Collect") }) } // ───────────────────────────────────────────────────────────────────────────── // TestIntegration_Live — LLM workflows, skipped unless LLM_KEY is set // ───────────────────────────────────────────────────────────────────────────── func TestIntegration_Live(t *testing.T) { cfg := config.Load("") if cfg.LLM.Key == "" { t.Skip("LLM_KEY not set — skipping live LLM integration tests") } cfg.Workspace.Root = t.TempDir() srv := httptest.NewServer(serverRunHandler(cfg)) t.Cleanup(srv.Close) // test01: basic single LLM call — verifies LLM step executes and emits llm_done t.Run("test01_basic_llm", func(t *testing.T) { events := runFixture(t, srv, "test01_basic_llm.json", map[string]interface{}{"prompt": "Reply with exactly one word: hello"}) assertWorkflowDone(t, events) assertLLMDoneCount(t, events, 1) }) // test03: parallel children fan-out — two LLM steps run concurrently t.Run("test03_parallel_children", func(t *testing.T) { events := runFixture(t, srv, "test03_parallel_children.json", map[string]interface{}{"topic": "cats"}) assertWorkflowDone(t, events) assertLLMDoneCount(t, events, 2) }) // test04: serial Loop — three sequential LLM calls (one per item) t.Run("test04_loop_serial", func(t *testing.T) { events := runFixture(t, srv, "test04_loop_serial.json", nil) assertWorkflowDone(t, events) assertLLMDoneCount(t, events, 3) }) // test05: parallel Loop — four concurrent LLM calls t.Run("test05_loop_parallel", func(t *testing.T) { events := runFixture(t, srv, "test05_loop_parallel.json", nil) assertWorkflowDone(t, events) assertLLMDoneCount(t, events, 4) }) // test09: output_config json_schema — LLM must return parseable JSON // (previously failing before AnthropicAdapter structured output fix) t.Run("test09_structured_output", func(t *testing.T) { events := runFixture(t, srv, "test09_structured_output.json", map[string]interface{}{"topic": "artificial intelligence"}) assertWorkflowDone(t, events) assertLLMDoneCount(t, events, 1) }) // test10: multi-stage pipeline — plan (json_schema) → parallel impl loop → report → Write_* t.Run("test10_complex_pipeline", func(t *testing.T) { events := runFixture(t, srv, "test10_complex_pipeline.json", map[string]interface{}{"requirement": "Build a simple todo app with add, list, and delete features"}) assertWorkflowDone(t, events) }) // test13: deep nesting — serial Loop → Branch → two different LLM steps (3 total) t.Run("test13_deep_nesting", func(t *testing.T) { events := runFixture(t, srv, "test13_deep_nesting.json", nil) assertWorkflowDone(t, events) assertLLMDoneCount(t, events, 3) }) // test15: LLM out simultaneously writes a variable AND a file path t.Run("test15_out_file_write", func(t *testing.T) { events := runFixture(t, srv, "test15_out_file_write.json", map[string]interface{}{"topic": "ocean"}) assertWorkflowDone(t, events) assertLLMDoneCount(t, events, 1) }) } // ───────────────────────────────────────────────────────────────────────────── // TestLiveServer_LLMNode — original smoke test, kept for backward compatibility // Verifies SSE framing: run_id consistency, sequential seq numbers, llm_done. // ───────────────────────────────────────────────────────────────────────────── const simpleOneLLMWorkflow = `{ "version": "3.15", "name": "LiveLLMTest", "registry": { "params": ["prompt(STRING)"] }, "steps": [ { "id": "LLM_answer", "in": { "messages": [ {"role": "user", "content": "=prompt"} ] }, "out": {"$reply": "=_result"}, "next": "Stop_end" }, {"id": "Stop_end"} ] }` func TestLiveServer_LLMNode(t *testing.T) { cfg := config.Load("") if cfg.LLM.Key == "" { t.Skip("LLM_KEY not set — skipping live LLM test") } srv := httptest.NewServer(serverRunHandler(cfg)) t.Cleanup(srv.Close) var wfDef map[string]interface{} if err := json.Unmarshal([]byte(simpleOneLLMWorkflow), &wfDef); err != nil { t.Fatalf("parse workflow fixture: %v", err) } reqBody := map[string]interface{}{ "workflowDef": wfDef, "runParams": map[string]interface{}{ "params": map[string]interface{}{"prompt": "Reply with exactly one word: hello"}, "workspaceId": "test-live", "mode": "create", }, } reqBytes, _ := json.Marshal(reqBody) resp, err := http.Post(srv.URL+"/run", "application/json", strings.NewReader(string(reqBytes))) if err != nil { t.Fatalf("POST /run: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Fatalf("HTTP status %d (want 200)", resp.StatusCode) } if ct := resp.Header.Get("Content-Type"); !strings.HasPrefix(ct, "text/event-stream") { t.Errorf("Content-Type = %q, want text/event-stream", ct) } var events []sseEvent scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { line := scanner.Text() if !strings.HasPrefix(line, "data: ") { continue } var ev sseEvent if err := json.Unmarshal([]byte(line[6:]), &ev); err != nil { t.Logf("skip unparseable event: %s", line) continue } sid := "" if ev.StepID != nil { sid = *ev.StepID } t.Logf("event #%d type=%-20s step=%s", ev.Seq, ev.Type, sid) events = append(events, ev) if ev.Type == "workflow_done" || ev.Type == "workflow_failed" { break } } if err := scanner.Err(); err != nil { t.Fatalf("SSE read error: %v", err) } if len(events) == 0 { t.Fatal("no SSE events received") } last := events[len(events)-1] if last.Type == "workflow_failed" { t.Fatalf("workflow_failed: payload=%v", last.Payload) } if last.Type != "workflow_done" { t.Errorf("last event type=%q, want workflow_done", last.Type) } // Verify SSE framing: all events share the same run_id runID := events[0].RunID for i, ev := range events { if ev.RunID != runID { t.Errorf("event #%d run_id=%q, want %q", i, ev.RunID, runID) } } // Verify seq is strictly 1-based and sequential for i, ev := range events { if ev.Seq != i+1 { t.Errorf("event #%d seq=%d, want %d", i, ev.Seq, i+1) } } // Verify llm_done event with latency/model metadata var sawLLMDone bool for _, ev := range events { if ev.Type == "llm_done" { sawLLMDone = true if ms, ok := ev.Payload["latency_ms"].(float64); ok { t.Logf("LLM latency: %.0fms", ms) } if model, ok := ev.Payload["model"].(string); ok { t.Logf("LLM model: %s", model) } } } if !sawLLMDone { t.Error("no llm_done event received") } }