| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603 |
- package main
- // main_test.go — HTTP server integration tests for the workflow engine.
- //
- // All tests spin up an in-process httptest.Server via serverRunHandler and
- // communicate exclusively through the real POST /run + SSE event-stream
- // interface. No engine internals are accessed directly.
- //
- // ── Test Groups ───────────────────────────────────────────────────────────────
- //
- // TestIntegration_Offline pure-logic workflows, no LLM required, always run.
- // TestIntegration_Live LLM workflows, skipped unless LLM_KEY is set.
- // TestLiveServer_LLMNode original smoke test kept for backward compatibility.
- //
- // ── Coverage Map ──────────────────────────────────────────────────────────────
- //
- // From Downloads test suite (15 JSON fixtures in cmd/testdata/test*.json):
- // test02 Set + Branch (3 param variants)
- // test06 Write_* file overwrite + append
- // test07 Step.if conditional skip (2 variants)
- // test08 Nested Loop + Branch (pure logic)
- // test11 Expression arithmetic + Branch (3 variants)
- // test12 Parallel Loop + Write_* + .length (previously failing)
- // test14 step_print events
- // test01 Basic LLM call [live]
- // test03 Parallel children fan-out (2 LLM) [live]
- // test04 Serial Loop (3 LLM) [live]
- // test05 Parallel Loop (4 LLM) [live]
- // test09 Structured JSON output via output_config [live]
- // test10 Complex multi-stage pipeline (plan→loop→report→write) [live]
- // test13 Deep nesting Loop+Branch routing to 2 LLM steps [live]
- // test15 LLM out writes file + variable simultaneously [live]
- //
- // From engine integration suite (TC-01~16, cmd/testdata/tc*.json):
- // tc03 Branch no-match + no ELSE continues to Branch.next
- // tc05 Noop parallel children fan-out
- // tc07 Step onError handler fires on Write_* path error
- // tc08 Registry param default value auto-applied when caller omits
- // tc09 Multi-entry nodes both start in parallel
- // tc_pause_timeout Pause_* with 1 s timeout auto-routes to handler
- // tc16 Workflow runs successfully with custom RunParams fields
- //
- // Not converted (TC-10, TC-15: require /resume HTTP endpoint;
- // TC-13: requires custom ComponentAdapter injection):
- // these scenarios remain documented in the fixture JSONs for future work.
- //
- // ── Run commands ─────────────────────────────────────────────────────────────
- //
- // go test ./cmd/ -run TestIntegration_Offline -v -timeout 30s
- // WORKFLOW_CONFIG=/path/.llm_config go test ./cmd/ -run TestIntegration_Live -v -timeout 180s
- // WORKFLOW_CONFIG=/path/.llm_config go test ./cmd/ -run TestLiveServer_LLMNode -v -timeout 60s
- import (
- "bufio"
- "encoding/json"
- "net/http"
- "net/http/httptest"
- "os"
- "path/filepath"
- "strings"
- "testing"
- "workflow/config"
- )
- // ─────────────────────────────────────────────────────────────────────────────
- // SSE helpers
- // ─────────────────────────────────────────────────────────────────────────────
- // (sseEvent is defined in main.go — same package, no redeclaration needed)
- // runFixture loads a workflow JSON from cmd/testdata/<fixtureName>, POSTs it
- // to srv with the given params, reads the SSE stream until workflow_done or
- // workflow_failed, and returns all received events.
- func runFixture(t *testing.T, srv *httptest.Server, fixtureName string, params map[string]interface{}) []sseEvent {
- t.Helper()
- data, err := os.ReadFile(filepath.Join("testdata", fixtureName))
- if err != nil {
- t.Fatalf("load fixture %s: %v", fixtureName, err)
- }
- var wfDef map[string]interface{}
- if err := json.Unmarshal(data, &wfDef); err != nil {
- t.Fatalf("parse fixture %s: %v", fixtureName, err)
- }
- if params == nil {
- params = map[string]interface{}{}
- }
- reqBody := map[string]interface{}{
- "workflowDef": wfDef,
- "runParams": map[string]interface{}{
- "params": params,
- "workspaceId": "test",
- "mode": "create",
- },
- }
- reqBytes, _ := json.Marshal(reqBody)
- resp, err := http.Post(srv.URL+"/run", "application/json", strings.NewReader(string(reqBytes)))
- if err != nil {
- t.Fatalf("POST /run: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- t.Fatalf("HTTP status %d (want 200)", resp.StatusCode)
- }
- var events []sseEvent
- scanner := bufio.NewScanner(resp.Body)
- for scanner.Scan() {
- line := scanner.Text()
- if !strings.HasPrefix(line, "data: ") {
- continue
- }
- var ev sseEvent
- if err := json.Unmarshal([]byte(line[6:]), &ev); err != nil {
- continue
- }
- sid := "<nil>"
- if ev.StepID != nil {
- sid = *ev.StepID
- }
- t.Logf("event #%d type=%-22s step=%s", ev.Seq, ev.Type, sid)
- events = append(events, ev)
- if ev.Type == "workflow_done" || ev.Type == "workflow_failed" {
- break
- }
- }
- if err := scanner.Err(); err != nil {
- t.Fatalf("SSE read error: %v", err)
- }
- return events
- }
- // ─────────────────────────────────────────────────────────────────────────────
- // Assertion helpers
- // ─────────────────────────────────────────────────────────────────────────────
- // assertWorkflowDone fails unless the final SSE event is workflow_done.
- func assertWorkflowDone(t *testing.T, events []sseEvent) {
- t.Helper()
- if len(events) == 0 {
- t.Fatal("no SSE events received")
- }
- last := events[len(events)-1]
- if last.Type == "workflow_failed" {
- t.Fatalf("workflow_failed: payload=%v", last.Payload)
- }
- if last.Type != "workflow_done" {
- t.Errorf("last event type=%q, want workflow_done", last.Type)
- }
- }
- // assertStepStarted fails unless at least one step_start event for stepID exists.
- func assertStepStarted(t *testing.T, events []sseEvent, stepID string) {
- t.Helper()
- for _, ev := range events {
- if ev.Type == "step_start" && ev.StepID != nil && *ev.StepID == stepID {
- return
- }
- }
- t.Errorf("step_start for %q not found in events", stepID)
- }
- // assertStepSkipped fails unless a step_skipped event for stepID exists.
- func assertStepSkipped(t *testing.T, events []sseEvent, stepID string) {
- t.Helper()
- for _, ev := range events {
- if ev.Type == "step_skipped" && ev.StepID != nil && *ev.StepID == stepID {
- return
- }
- }
- t.Errorf("step_skipped for %q not found in events", stepID)
- }
- // assertHasEventType fails unless at least one event of the given type exists.
- func assertHasEventType(t *testing.T, events []sseEvent, eventType string) {
- t.Helper()
- for _, ev := range events {
- if ev.Type == eventType {
- return
- }
- }
- t.Errorf("event type %q not found in %d events", eventType, len(events))
- }
- // assertPrintCount fails unless exactly want step_print events were received.
- func assertPrintCount(t *testing.T, events []sseEvent, want int) {
- t.Helper()
- got := 0
- for _, ev := range events {
- if ev.Type == "step_print" {
- got++
- }
- }
- if got != want {
- t.Errorf("step_print count=%d, want %d", got, want)
- }
- }
- // assertLLMDoneCount fails unless exactly want llm_done events were received.
- func assertLLMDoneCount(t *testing.T, events []sseEvent, want int) {
- t.Helper()
- got := 0
- for _, ev := range events {
- if ev.Type == "llm_done" {
- got++
- }
- }
- if got != want {
- t.Errorf("llm_done count=%d, want %d", got, want)
- }
- }
- // ─────────────────────────────────────────────────────────────────────────────
- // TestIntegration_Offline — pure-logic tests, no LLM, always runnable
- // ─────────────────────────────────────────────────────────────────────────────
- func TestIntegration_Offline(t *testing.T) {
- cfg := config.Load("")
- cfg.Workspace.Root = t.TempDir()
- srv := httptest.NewServer(serverRunHandler(cfg))
- t.Cleanup(srv.Close)
- // ── From Downloads test suite ─────────────────────────────────────────
- // test02: Set + Branch — three score ranges map to distinct steps
- t.Run("test02a_branch_excellent", func(t *testing.T) {
- events := runFixture(t, srv, "test02_set_and_branch.json",
- map[string]interface{}{"score": 95})
- assertWorkflowDone(t, events)
- assertStepStarted(t, events, "Set_Excellent")
- })
- t.Run("test02b_branch_pass", func(t *testing.T) {
- events := runFixture(t, srv, "test02_set_and_branch.json",
- map[string]interface{}{"score": 75})
- assertWorkflowDone(t, events)
- assertStepStarted(t, events, "Set_Pass")
- })
- t.Run("test02c_branch_fail", func(t *testing.T) {
- events := runFixture(t, srv, "test02_set_and_branch.json",
- map[string]interface{}{"score": 40})
- assertWorkflowDone(t, events)
- assertStepStarted(t, events, "Set_Fail")
- })
- // test06: Write_* file operations — overwrite then append
- t.Run("test06_write_file", func(t *testing.T) {
- events := runFixture(t, srv, "test06_write_file.json",
- map[string]interface{}{"content": "hello test06"})
- assertWorkflowDone(t, events)
- })
- // test07: Step.if conditional skip — two param variants
- t.Run("test07a_if_executed", func(t *testing.T) {
- events := runFixture(t, srv, "test07_if_conditional.json",
- map[string]interface{}{"shouldRun": "yes"})
- assertWorkflowDone(t, events)
- assertStepStarted(t, events, "Set_Conditional")
- assertStepSkipped(t, events, "Set_Skipped")
- })
- t.Run("test07b_if_skipped", func(t *testing.T) {
- events := runFixture(t, srv, "test07_if_conditional.json",
- map[string]interface{}{"shouldRun": "no"})
- assertWorkflowDone(t, events)
- assertStepSkipped(t, events, "Set_Conditional")
- assertStepStarted(t, events, "Set_Skipped")
- })
- // test08: Nested Loop + Branch — pure logic, no LLM
- t.Run("test08_nested_loop_branch", func(t *testing.T) {
- events := runFixture(t, srv, "test08_nested_loop_branch.json", nil)
- assertWorkflowDone(t, events)
- })
- // test11: Expression arithmetic + Branch — three comparison outcomes
- t.Run("test11a_expr_a_greater", func(t *testing.T) {
- events := runFixture(t, srv, "test11_expression_calc.json",
- map[string]interface{}{"a": 10, "b": 5})
- assertWorkflowDone(t, events)
- assertStepStarted(t, events, "Set_AGreater")
- })
- t.Run("test11b_expr_equal", func(t *testing.T) {
- events := runFixture(t, srv, "test11_expression_calc.json",
- map[string]interface{}{"a": 7, "b": 7})
- assertWorkflowDone(t, events)
- assertStepStarted(t, events, "Set_Equal")
- })
- t.Run("test11c_expr_b_greater", func(t *testing.T) {
- events := runFixture(t, srv, "test11_expression_calc.json",
- map[string]interface{}{"a": 3, "b": 8})
- assertWorkflowDone(t, events)
- assertStepStarted(t, events, "Set_BGreater")
- })
- // test12: Parallel loop + Write_* + .length (previously failing before fix)
- t.Run("test12_array_length", func(t *testing.T) {
- events := runFixture(t, srv, "test12_multi_write_loop.json", nil)
- assertWorkflowDone(t, events)
- })
- // test14: step_print events — three print steps expected
- t.Run("test14_print_events", func(t *testing.T) {
- events := runFixture(t, srv, "test14_print_events.json", nil)
- assertWorkflowDone(t, events)
- assertPrintCount(t, events, 3)
- })
- // ── From engine TC suite (new coverage not in Downloads set) ─────────
- // tc03: Branch with no matching case and no ELSE continues to Branch.next
- // (v3.15 fix: previously caused an error instead of falling through)
- t.Run("tc03_branch_no_match", func(t *testing.T) {
- events := runFixture(t, srv, "tc03_branch_no_match.json",
- map[string]interface{}{"score": 40})
- assertWorkflowDone(t, events)
- assertStepStarted(t, events, "Set_default")
- })
- // tc05: Noop_* node runs children as parallel sub-chains
- t.Run("tc05_noop_parallel", func(t *testing.T) {
- events := runFixture(t, srv, "tc05_noop_parallel.json", nil)
- assertWorkflowDone(t, events)
- assertStepStarted(t, events, "Set_left")
- assertStepStarted(t, events, "Set_right")
- })
- // tc07: Step onError handler fires when Write_* targets an undeclared artifact path
- t.Run("tc07_onerror_recovery", func(t *testing.T) {
- events := runFixture(t, srv, "tc07_onerror_recovery.json", nil)
- assertWorkflowDone(t, events)
- assertHasEventType(t, events, "step_error")
- assertStepStarted(t, events, "Set_fallback")
- })
- // tc08: Registry param with default value is auto-applied when caller omits it
- t.Run("tc08_param_default", func(t *testing.T) {
- events := runFixture(t, srv, "tc08_param_default.json", nil) // omit timeout
- assertWorkflowDone(t, events)
- assertStepStarted(t, events, "Set_ok") // Branch matched timeout==30
- })
- // tc09: Two unreferenced steps are both detected as entry nodes and
- // launched in parallel; both chains must complete
- t.Run("tc09_multi_entry", func(t *testing.T) {
- events := runFixture(t, srv, "tc09_multi_entry.json", nil)
- assertWorkflowDone(t, events)
- assertStepStarted(t, events, "Set_a")
- assertStepStarted(t, events, "Set_b")
- })
- // tc_pause_timeout: Pause_* with a 1 s timeout auto-routes to the timeout
- // handler without any external Resume call
- t.Run("tc_pause_timeout", func(t *testing.T) {
- events := runFixture(t, srv, "tc_pause_timeout.json", nil)
- assertWorkflowDone(t, events)
- assertHasEventType(t, events, "pause_start")
- assertHasEventType(t, events, "pause_timeout")
- assertStepStarted(t, events, "Set_timedOut")
- })
- // tc16: Workflow runs correctly when non-default RunParams fields
- // (workspaceId, mode) are supplied — engine must not reject them
- t.Run("tc16_run_params", func(t *testing.T) {
- events := runFixture(t, srv, "tc16_run_params.json", nil)
- assertWorkflowDone(t, events)
- })
- // ── v3.16 feature tests ──────────────────────────────────────────────
- // tc_while_loop: while loop counts from 0 to 4 using Set_* increment
- t.Run("tc_while_loop", func(t *testing.T) {
- events := runFixture(t, srv, "tc_while_loop.json", nil)
- assertWorkflowDone(t, events)
- // Loop body should fire 5 times (counter 0..4)
- assertStepStarted(t, events, "Set_Log")
- assertStepStarted(t, events, "Set_Inc")
- })
- // tc_break_serial: BREAK exits serial loop early when item==3
- t.Run("tc_break_serial", func(t *testing.T) {
- events := runFixture(t, srv, "tc_break_serial.json", nil)
- assertWorkflowDone(t, events)
- assertStepStarted(t, events, "Branch_Check")
- assertStepStarted(t, events, "Set_BreakHere")
- })
- // tc_source_maxiter: source loop with 5 items capped at maxIterations=3
- t.Run("tc_source_maxiter", func(t *testing.T) {
- events := runFixture(t, srv, "tc_source_maxiter.json", nil)
- assertWorkflowDone(t, events)
- assertStepStarted(t, events, "Set_Collect")
- })
- }
- // ─────────────────────────────────────────────────────────────────────────────
- // TestIntegration_Live — LLM workflows, skipped unless LLM_KEY is set
- // ─────────────────────────────────────────────────────────────────────────────
- func TestIntegration_Live(t *testing.T) {
- cfg := config.Load("")
- if cfg.LLM.Key == "" {
- t.Skip("LLM_KEY not set — skipping live LLM integration tests")
- }
- cfg.Workspace.Root = t.TempDir()
- srv := httptest.NewServer(serverRunHandler(cfg))
- t.Cleanup(srv.Close)
- // test01: basic single LLM call — verifies LLM step executes and emits llm_done
- t.Run("test01_basic_llm", func(t *testing.T) {
- events := runFixture(t, srv, "test01_basic_llm.json",
- map[string]interface{}{"prompt": "Reply with exactly one word: hello"})
- assertWorkflowDone(t, events)
- assertLLMDoneCount(t, events, 1)
- })
- // test03: parallel children fan-out — two LLM steps run concurrently
- t.Run("test03_parallel_children", func(t *testing.T) {
- events := runFixture(t, srv, "test03_parallel_children.json",
- map[string]interface{}{"topic": "cats"})
- assertWorkflowDone(t, events)
- assertLLMDoneCount(t, events, 2)
- })
- // test04: serial Loop — three sequential LLM calls (one per item)
- t.Run("test04_loop_serial", func(t *testing.T) {
- events := runFixture(t, srv, "test04_loop_serial.json", nil)
- assertWorkflowDone(t, events)
- assertLLMDoneCount(t, events, 3)
- })
- // test05: parallel Loop — four concurrent LLM calls
- t.Run("test05_loop_parallel", func(t *testing.T) {
- events := runFixture(t, srv, "test05_loop_parallel.json", nil)
- assertWorkflowDone(t, events)
- assertLLMDoneCount(t, events, 4)
- })
- // test09: output_config json_schema — LLM must return parseable JSON
- // (previously failing before AnthropicAdapter structured output fix)
- t.Run("test09_structured_output", func(t *testing.T) {
- events := runFixture(t, srv, "test09_structured_output.json",
- map[string]interface{}{"topic": "artificial intelligence"})
- assertWorkflowDone(t, events)
- assertLLMDoneCount(t, events, 1)
- })
- // test10: multi-stage pipeline — plan (json_schema) → parallel impl loop → report → Write_*
- t.Run("test10_complex_pipeline", func(t *testing.T) {
- events := runFixture(t, srv, "test10_complex_pipeline.json",
- map[string]interface{}{"requirement": "Build a simple todo app with add, list, and delete features"})
- assertWorkflowDone(t, events)
- })
- // test13: deep nesting — serial Loop → Branch → two different LLM steps (3 total)
- t.Run("test13_deep_nesting", func(t *testing.T) {
- events := runFixture(t, srv, "test13_deep_nesting.json", nil)
- assertWorkflowDone(t, events)
- assertLLMDoneCount(t, events, 3)
- })
- // test15: LLM out simultaneously writes a variable AND a file path
- t.Run("test15_out_file_write", func(t *testing.T) {
- events := runFixture(t, srv, "test15_out_file_write.json",
- map[string]interface{}{"topic": "ocean"})
- assertWorkflowDone(t, events)
- assertLLMDoneCount(t, events, 1)
- })
- }
- // ─────────────────────────────────────────────────────────────────────────────
- // TestLiveServer_LLMNode — original smoke test, kept for backward compatibility
- // Verifies SSE framing: run_id consistency, sequential seq numbers, llm_done.
- // ─────────────────────────────────────────────────────────────────────────────
- const simpleOneLLMWorkflow = `{
- "version": "3.15",
- "name": "LiveLLMTest",
- "registry": {
- "params": ["prompt(STRING)"]
- },
- "steps": [
- {
- "id": "LLM_answer",
- "in": {
- "messages": [
- {"role": "user", "content": "=prompt"}
- ]
- },
- "out": {"$reply": "=_result"},
- "next": "Stop_end"
- },
- {"id": "Stop_end"}
- ]
- }`
- func TestLiveServer_LLMNode(t *testing.T) {
- cfg := config.Load("")
- if cfg.LLM.Key == "" {
- t.Skip("LLM_KEY not set — skipping live LLM test")
- }
- srv := httptest.NewServer(serverRunHandler(cfg))
- t.Cleanup(srv.Close)
- var wfDef map[string]interface{}
- if err := json.Unmarshal([]byte(simpleOneLLMWorkflow), &wfDef); err != nil {
- t.Fatalf("parse workflow fixture: %v", err)
- }
- reqBody := map[string]interface{}{
- "workflowDef": wfDef,
- "runParams": map[string]interface{}{
- "params": map[string]interface{}{"prompt": "Reply with exactly one word: hello"},
- "workspaceId": "test-live",
- "mode": "create",
- },
- }
- reqBytes, _ := json.Marshal(reqBody)
- resp, err := http.Post(srv.URL+"/run", "application/json",
- strings.NewReader(string(reqBytes)))
- if err != nil {
- t.Fatalf("POST /run: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- t.Fatalf("HTTP status %d (want 200)", resp.StatusCode)
- }
- if ct := resp.Header.Get("Content-Type"); !strings.HasPrefix(ct, "text/event-stream") {
- t.Errorf("Content-Type = %q, want text/event-stream", ct)
- }
- var events []sseEvent
- scanner := bufio.NewScanner(resp.Body)
- for scanner.Scan() {
- line := scanner.Text()
- if !strings.HasPrefix(line, "data: ") {
- continue
- }
- var ev sseEvent
- if err := json.Unmarshal([]byte(line[6:]), &ev); err != nil {
- t.Logf("skip unparseable event: %s", line)
- continue
- }
- sid := "<nil>"
- if ev.StepID != nil {
- sid = *ev.StepID
- }
- t.Logf("event #%d type=%-20s step=%s", ev.Seq, ev.Type, sid)
- events = append(events, ev)
- if ev.Type == "workflow_done" || ev.Type == "workflow_failed" {
- break
- }
- }
- if err := scanner.Err(); err != nil {
- t.Fatalf("SSE read error: %v", err)
- }
- if len(events) == 0 {
- t.Fatal("no SSE events received")
- }
- last := events[len(events)-1]
- if last.Type == "workflow_failed" {
- t.Fatalf("workflow_failed: payload=%v", last.Payload)
- }
- if last.Type != "workflow_done" {
- t.Errorf("last event type=%q, want workflow_done", last.Type)
- }
- // Verify SSE framing: all events share the same run_id
- runID := events[0].RunID
- for i, ev := range events {
- if ev.RunID != runID {
- t.Errorf("event #%d run_id=%q, want %q", i, ev.RunID, runID)
- }
- }
- // Verify seq is strictly 1-based and sequential
- for i, ev := range events {
- if ev.Seq != i+1 {
- t.Errorf("event #%d seq=%d, want %d", i, ev.Seq, i+1)
- }
- }
- // Verify llm_done event with latency/model metadata
- var sawLLMDone bool
- for _, ev := range events {
- if ev.Type == "llm_done" {
- sawLLMDone = true
- if ms, ok := ev.Payload["latency_ms"].(float64); ok {
- t.Logf("LLM latency: %.0fms", ms)
- }
- if model, ok := ev.Payload["model"].(string); ok {
- t.Logf("LLM model: %s", model)
- }
- }
- }
- if !sawLLMDone {
- t.Error("no llm_done event received")
- }
- }
|