package workflow_test import ( "context" "testing" "workflow" ) // makeV312Workflow returns a minimal v3.10 workflow with a single LLM step for 3.12 feature tests. // When inStream is true, in.stream is set to enable streaming. func makeV312Workflow(inStream bool) *workflow.Workflow { in := workflow.StepInput{ "messages": []interface{}{ map[string]interface{}{"role": "user", "content": "=$question"}, }, } if inStream { in["stream"] = true } return &workflow.Workflow{ Version: "3.10", Name: "V312 Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Params: []string{"question(STRING)"}, Vars: []string{"$answer(STRING)"}, Files: workflow.FilesRegistry{}, }, Steps: []workflow.Step{ { ID: "LLM_Answer", In: in, Out: workflow.StepOutput{"$answer": "=_result"}, Next: "Stop_End", }, {ID: "Stop_End"}, }, } } // --------------------------------------------------------------------------- // in.stream tests (spec 3.12 ยง1 โ€” in.stream remains the streaming control) // --------------------------------------------------------------------------- // TestStreamNotSetByDefault verifies that when in.stream is absent, // no llm_token RunEvents are emitted but llm_done IS emitted. func TestStreamNotSetByDefault(t *testing.T) { wf := makeV312Workflow(false) // no in.stream adapters := createTestAdapters() llm := adapters.LLM.(*workflow.DefaultLLMAdapter) llm.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { // params["stream"] should not be set if v, ok := params["stream"].(bool); ok && v { t.Error("params[\"stream\"] should not be true when in.stream is absent") } return map[string]interface{}{ "content": "hello", "model": "gpt-4", "finish_reason": "stop", "usage": map[string]interface{}{ "prompt_tokens": 5, "completion_tokens": 3, "total_tokens": 8, }, }, nil }) engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("NewEngine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{"question": "hi"}, adapters) if err != nil { t.Fatalf("Execute: %v", err) } var runEvents []workflow.RunEvent for ev := range result.RunEventStream { runEvents = append(runEvents, ev) } // No llm_token events expected for _, ev := range runEvents { if ev.Type == workflow.RunEventLLMToken { t.Error("unexpected llm_token RunEvent when in.stream is absent") } } // llm_done MUST be present hasDone := false for _, ev := range runEvents { if ev.Type == workflow.RunEventLLMDone { hasDone = true if ev.StepID == nil || *ev.StepID != "LLM_Answer" { t.Errorf("llm_done step_id: got %v, want 'LLM_Answer'", ev.StepID) } if ev.Payload["latency_ms"] == nil { t.Error("llm_done payload missing latency_ms") } if ev.Payload["finish_reason"] != "stop" { t.Errorf("llm_done finish_reason: got %v, want 'stop'", ev.Payload["finish_reason"]) } if ev.Payload["model"] != "gpt-4" { t.Errorf("llm_done model: got %v, want 'gpt-4'", ev.Payload["model"]) } usage, ok := ev.Payload["usage"].(map[string]interface{}) if !ok { t.Error("llm_done payload missing usage") } else if usage["total_tokens"] != 8 { t.Errorf("llm_done usage.total_tokens: got %v, want 8", usage["total_tokens"]) } } } if !hasDone { t.Error("expected llm_done RunEvent, none found") } } // TestStreamInStreamTrue verifies that in.stream:true forwards llm_token RunEvents. func TestStreamInStreamTrue(t *testing.T) { wf := makeV312Workflow(true) // in.stream: true adapters := createTestAdapters() llm := adapters.LLM.(*workflow.DefaultLLMAdapter) llm.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { // adapter should receive stream:true if v, ok := params["stream"].(bool); !ok || !v { t.Error("params[\"stream\"] should be true when in.stream is true") } chunks := []string{"He", "ll", "o!"} for _, c := range chunks { stream <- c } return map[string]interface{}{ "content": "Hello!", "model": "gpt-4", "finish_reason": "stop", "usage": map[string]interface{}{ "prompt_tokens": 5, "completion_tokens": 2, "total_tokens": 7, }, }, nil }) engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("NewEngine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{"question": "hi"}, adapters) if err != nil { t.Fatalf("Execute: %v", err) } var runEvents []workflow.RunEvent for ev := range result.RunEventStream { runEvents = append(runEvents, ev) } // Count llm_token events and verify deltas var tokens []string for _, ev := range runEvents { if ev.Type == workflow.RunEventLLMToken { if ev.StepID == nil || *ev.StepID != "LLM_Answer" { t.Errorf("llm_token step_id: got %v, want 'LLM_Answer'", ev.StepID) } if delta, ok := ev.Payload["delta"].(string); ok { tokens = append(tokens, delta) } } } if len(tokens) != 3 { t.Errorf("expected 3 llm_token events, got %d", len(tokens)) } // llm_done must come after all llm_token events lastTokenIdx, doneIdx := -1, -1 for i, ev := range runEvents { if ev.Type == workflow.RunEventLLMToken { lastTokenIdx = i } if ev.Type == workflow.RunEventLLMDone { doneIdx = i } } if doneIdx == -1 { t.Fatal("expected llm_done RunEvent, none found") } if lastTokenIdx != -1 && doneIdx <= lastTokenIdx { t.Error("llm_done must come after all llm_token events") } } // --------------------------------------------------------------------------- // run_events structure tests (spec 3.12 Chapter 13) // --------------------------------------------------------------------------- // TestRunEventsWorkflowLifecycle verifies that workflow_start and workflow_done // are emitted with correct payloads and ordering. func TestRunEventsWorkflowLifecycle(t *testing.T) { wf := makeV312Workflow(false) adapters := createTestAdapters() llm := adapters.LLM.(*workflow.DefaultLLMAdapter) llm.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { return map[string]interface{}{"content": "ok", "model": "gpt-4", "finish_reason": "stop"}, nil }) engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("NewEngine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{"question": "test"}, adapters) if err != nil { t.Fatalf("Execute: %v", err) } var runEvents []workflow.RunEvent for ev := range result.RunEventStream { runEvents = append(runEvents, ev) } if len(runEvents) == 0 { t.Fatal("expected RunEvents, got none") } // First event must be workflow_start if runEvents[0].Type != workflow.RunEventWorkflowStart { t.Errorf("first RunEvent type: got %q, want %q", runEvents[0].Type, workflow.RunEventWorkflowStart) } if runEvents[0].StepID != nil { t.Error("workflow_start step_id must be null") } if _, ok := runEvents[0].Payload["params"]; !ok { t.Error("workflow_start payload missing 'params'") } // Last event must be workflow_done last := runEvents[len(runEvents)-1] if last.Type != workflow.RunEventWorkflowDone { t.Errorf("last RunEvent type: got %q, want %q", last.Type, workflow.RunEventWorkflowDone) } if last.StepID != nil { t.Error("workflow_done step_id must be null") } if last.Payload["stop_id"] != "Stop_End" { t.Errorf("workflow_done stop_id: got %v, want 'Stop_End'", last.Payload["stop_id"]) } if last.Payload["duration_ms"] == nil { t.Error("workflow_done payload missing duration_ms") } // seq must be monotonically increasing from 1 for i, ev := range runEvents { if ev.Seq != uint64(i+1) { t.Errorf("RunEvent[%d].seq: got %d, want %d", i, ev.Seq, i+1) } } // run_id must be consistent across all events runID := runEvents[0].RunID for i, ev := range runEvents { if ev.RunID != runID { t.Errorf("RunEvent[%d].run_id mismatch: got %q, want %q", i, ev.RunID, runID) } } // ts must be non-empty for _, ev := range runEvents { if ev.Ts == "" { t.Errorf("RunEvent %q has empty ts", ev.Type) } } } // TestRunEventsStepLifecycle verifies step_start โ†’ llm_done โ†’ step_done ordering. func TestRunEventsStepLifecycle(t *testing.T) { wf := makeV312Workflow(false) adapters := createTestAdapters() llm := adapters.LLM.(*workflow.DefaultLLMAdapter) llm.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { return map[string]interface{}{ "content": "answer", "model": "gpt-4", "finish_reason": "stop", "usage": map[string]interface{}{"prompt_tokens": 3, "completion_tokens": 2, "total_tokens": 5}, }, nil }) engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("NewEngine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{"question": "q"}, adapters) if err != nil { t.Fatalf("Execute: %v", err) } var runEvents []workflow.RunEvent for ev := range result.RunEventStream { runEvents = append(runEvents, ev) } // Find indices of relevant events for LLM_Answer idxStart, idxLLMDone, idxDone := -1, -1, -1 for i, ev := range runEvents { if ev.StepID != nil && *ev.StepID == "LLM_Answer" { switch ev.Type { case workflow.RunEventStepStart: idxStart = i case workflow.RunEventLLMDone: idxLLMDone = i case workflow.RunEventStepDone: idxDone = i } } } if idxStart == -1 { t.Fatal("missing step_start for LLM_Answer") } if idxLLMDone == -1 { t.Fatal("missing llm_done for LLM_Answer") } if idxDone == -1 { t.Fatal("missing step_done for LLM_Answer") } // Order: step_start < llm_done < step_done if !(idxStart < idxLLMDone && idxLLMDone < idxDone) { t.Errorf("event order wrong: step_start=%d, llm_done=%d, step_done=%d (want start