| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500 |
- package workflow_test
- import (
- "context"
- "testing"
- "workflow"
- )
- // makeV312Workflow returns a minimal v3.10 workflow with a single LLM step for 3.12 feature tests.
- // When inStream is true, in.stream is set to enable streaming.
- func makeV312Workflow(inStream bool) *workflow.Workflow {
- in := workflow.StepInput{
- "messages": []interface{}{
- map[string]interface{}{"role": "user", "content": "=$question"},
- },
- }
- if inStream {
- in["stream"] = true
- }
- return &workflow.Workflow{
- Version: "3.10",
- Name: "V312 Test",
- Registry: workflow.Registry{
- Services: []string{},
- Components: []string{},
- Params: []string{"question(STRING)"},
- Vars: []string{"$answer(STRING)"},
- Files: workflow.FilesRegistry{},
- },
- Steps: []workflow.Step{
- {
- ID: "LLM_Answer",
- In: in,
- Out: workflow.StepOutput{"$answer": "=_result"},
- Next: "Stop_End",
- },
- {ID: "Stop_End"},
- },
- }
- }
- // ---------------------------------------------------------------------------
- // in.stream tests (spec 3.12 §1 — in.stream remains the streaming control)
- // ---------------------------------------------------------------------------
- // TestStreamNotSetByDefault verifies that when in.stream is absent,
- // no llm_token RunEvents are emitted but llm_done IS emitted.
- func TestStreamNotSetByDefault(t *testing.T) {
- wf := makeV312Workflow(false) // no in.stream
- adapters := createTestAdapters()
- llm := adapters.LLM.(*workflow.DefaultLLMAdapter)
- llm.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
- // params["stream"] should not be set
- if v, ok := params["stream"].(bool); ok && v {
- t.Error("params[\"stream\"] should not be true when in.stream is absent")
- }
- return map[string]interface{}{
- "content": "hello",
- "model": "gpt-4",
- "finish_reason": "stop",
- "usage": map[string]interface{}{
- "prompt_tokens": 5,
- "completion_tokens": 3,
- "total_tokens": 8,
- },
- }, nil
- })
- engine, err := workflow.NewEngine(wf)
- if err != nil {
- t.Fatalf("NewEngine: %v", err)
- }
- result, err := engine.Execute(context.Background(), map[string]interface{}{"question": "hi"}, adapters)
- if err != nil {
- t.Fatalf("Execute: %v", err)
- }
- var runEvents []workflow.RunEvent
- for ev := range result.RunEventStream {
- runEvents = append(runEvents, ev)
- }
- // No llm_token events expected
- for _, ev := range runEvents {
- if ev.Type == workflow.RunEventLLMToken {
- t.Error("unexpected llm_token RunEvent when in.stream is absent")
- }
- }
- // llm_done MUST be present
- hasDone := false
- for _, ev := range runEvents {
- if ev.Type == workflow.RunEventLLMDone {
- hasDone = true
- if ev.StepID == nil || *ev.StepID != "LLM_Answer" {
- t.Errorf("llm_done step_id: got %v, want 'LLM_Answer'", ev.StepID)
- }
- if ev.Payload["latency_ms"] == nil {
- t.Error("llm_done payload missing latency_ms")
- }
- if ev.Payload["finish_reason"] != "stop" {
- t.Errorf("llm_done finish_reason: got %v, want 'stop'", ev.Payload["finish_reason"])
- }
- if ev.Payload["model"] != "gpt-4" {
- t.Errorf("llm_done model: got %v, want 'gpt-4'", ev.Payload["model"])
- }
- usage, ok := ev.Payload["usage"].(map[string]interface{})
- if !ok {
- t.Error("llm_done payload missing usage")
- } else if usage["total_tokens"] != 8 {
- t.Errorf("llm_done usage.total_tokens: got %v, want 8", usage["total_tokens"])
- }
- }
- }
- if !hasDone {
- t.Error("expected llm_done RunEvent, none found")
- }
- }
- // TestStreamInStreamTrue verifies that in.stream:true forwards llm_token RunEvents.
- func TestStreamInStreamTrue(t *testing.T) {
- wf := makeV312Workflow(true) // in.stream: true
- adapters := createTestAdapters()
- llm := adapters.LLM.(*workflow.DefaultLLMAdapter)
- llm.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
- // adapter should receive stream:true
- if v, ok := params["stream"].(bool); !ok || !v {
- t.Error("params[\"stream\"] should be true when in.stream is true")
- }
- chunks := []string{"He", "ll", "o!"}
- for _, c := range chunks {
- stream <- c
- }
- return map[string]interface{}{
- "content": "Hello!",
- "model": "gpt-4",
- "finish_reason": "stop",
- "usage": map[string]interface{}{
- "prompt_tokens": 5,
- "completion_tokens": 2,
- "total_tokens": 7,
- },
- }, nil
- })
- engine, err := workflow.NewEngine(wf)
- if err != nil {
- t.Fatalf("NewEngine: %v", err)
- }
- result, err := engine.Execute(context.Background(), map[string]interface{}{"question": "hi"}, adapters)
- if err != nil {
- t.Fatalf("Execute: %v", err)
- }
- var runEvents []workflow.RunEvent
- for ev := range result.RunEventStream {
- runEvents = append(runEvents, ev)
- }
- // Count llm_token events and verify deltas
- var tokens []string
- for _, ev := range runEvents {
- if ev.Type == workflow.RunEventLLMToken {
- if ev.StepID == nil || *ev.StepID != "LLM_Answer" {
- t.Errorf("llm_token step_id: got %v, want 'LLM_Answer'", ev.StepID)
- }
- if delta, ok := ev.Payload["delta"].(string); ok {
- tokens = append(tokens, delta)
- }
- }
- }
- if len(tokens) != 3 {
- t.Errorf("expected 3 llm_token events, got %d", len(tokens))
- }
- // llm_done must come after all llm_token events
- lastTokenIdx, doneIdx := -1, -1
- for i, ev := range runEvents {
- if ev.Type == workflow.RunEventLLMToken {
- lastTokenIdx = i
- }
- if ev.Type == workflow.RunEventLLMDone {
- doneIdx = i
- }
- }
- if doneIdx == -1 {
- t.Fatal("expected llm_done RunEvent, none found")
- }
- if lastTokenIdx != -1 && doneIdx <= lastTokenIdx {
- t.Error("llm_done must come after all llm_token events")
- }
- }
- // ---------------------------------------------------------------------------
- // run_events structure tests (spec 3.12 Chapter 13)
- // ---------------------------------------------------------------------------
- // TestRunEventsWorkflowLifecycle verifies that workflow_start and workflow_done
- // are emitted with correct payloads and ordering.
- func TestRunEventsWorkflowLifecycle(t *testing.T) {
- wf := makeV312Workflow(false)
- adapters := createTestAdapters()
- llm := adapters.LLM.(*workflow.DefaultLLMAdapter)
- llm.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
- return map[string]interface{}{"content": "ok", "model": "gpt-4", "finish_reason": "stop"}, nil
- })
- engine, err := workflow.NewEngine(wf)
- if err != nil {
- t.Fatalf("NewEngine: %v", err)
- }
- result, err := engine.Execute(context.Background(), map[string]interface{}{"question": "test"}, adapters)
- if err != nil {
- t.Fatalf("Execute: %v", err)
- }
- var runEvents []workflow.RunEvent
- for ev := range result.RunEventStream {
- runEvents = append(runEvents, ev)
- }
- if len(runEvents) == 0 {
- t.Fatal("expected RunEvents, got none")
- }
- // First event must be workflow_start
- if runEvents[0].Type != workflow.RunEventWorkflowStart {
- t.Errorf("first RunEvent type: got %q, want %q", runEvents[0].Type, workflow.RunEventWorkflowStart)
- }
- if runEvents[0].StepID != nil {
- t.Error("workflow_start step_id must be null")
- }
- if _, ok := runEvents[0].Payload["params"]; !ok {
- t.Error("workflow_start payload missing 'params'")
- }
- // Last event must be workflow_done
- last := runEvents[len(runEvents)-1]
- if last.Type != workflow.RunEventWorkflowDone {
- t.Errorf("last RunEvent type: got %q, want %q", last.Type, workflow.RunEventWorkflowDone)
- }
- if last.StepID != nil {
- t.Error("workflow_done step_id must be null")
- }
- if last.Payload["stop_id"] != "Stop_End" {
- t.Errorf("workflow_done stop_id: got %v, want 'Stop_End'", last.Payload["stop_id"])
- }
- if last.Payload["duration_ms"] == nil {
- t.Error("workflow_done payload missing duration_ms")
- }
- // seq must be monotonically increasing from 1
- for i, ev := range runEvents {
- if ev.Seq != uint64(i+1) {
- t.Errorf("RunEvent[%d].seq: got %d, want %d", i, ev.Seq, i+1)
- }
- }
- // run_id must be consistent across all events
- runID := runEvents[0].RunID
- for i, ev := range runEvents {
- if ev.RunID != runID {
- t.Errorf("RunEvent[%d].run_id mismatch: got %q, want %q", i, ev.RunID, runID)
- }
- }
- // ts must be non-empty
- for _, ev := range runEvents {
- if ev.Ts == "" {
- t.Errorf("RunEvent %q has empty ts", ev.Type)
- }
- }
- }
- // TestRunEventsStepLifecycle verifies step_start → llm_done → step_done ordering.
- func TestRunEventsStepLifecycle(t *testing.T) {
- wf := makeV312Workflow(false)
- adapters := createTestAdapters()
- llm := adapters.LLM.(*workflow.DefaultLLMAdapter)
- llm.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
- return map[string]interface{}{
- "content": "answer",
- "model": "gpt-4",
- "finish_reason": "stop",
- "usage": map[string]interface{}{"prompt_tokens": 3, "completion_tokens": 2, "total_tokens": 5},
- }, nil
- })
- engine, err := workflow.NewEngine(wf)
- if err != nil {
- t.Fatalf("NewEngine: %v", err)
- }
- result, err := engine.Execute(context.Background(), map[string]interface{}{"question": "q"}, adapters)
- if err != nil {
- t.Fatalf("Execute: %v", err)
- }
- var runEvents []workflow.RunEvent
- for ev := range result.RunEventStream {
- runEvents = append(runEvents, ev)
- }
- // Find indices of relevant events for LLM_Answer
- idxStart, idxLLMDone, idxDone := -1, -1, -1
- for i, ev := range runEvents {
- if ev.StepID != nil && *ev.StepID == "LLM_Answer" {
- switch ev.Type {
- case workflow.RunEventStepStart:
- idxStart = i
- case workflow.RunEventLLMDone:
- idxLLMDone = i
- case workflow.RunEventStepDone:
- idxDone = i
- }
- }
- }
- if idxStart == -1 {
- t.Fatal("missing step_start for LLM_Answer")
- }
- if idxLLMDone == -1 {
- t.Fatal("missing llm_done for LLM_Answer")
- }
- if idxDone == -1 {
- t.Fatal("missing step_done for LLM_Answer")
- }
- // Order: step_start < llm_done < step_done
- if !(idxStart < idxLLMDone && idxLLMDone < idxDone) {
- t.Errorf("event order wrong: step_start=%d, llm_done=%d, step_done=%d (want start<llmdone<done)",
- idxStart, idxLLMDone, idxDone)
- }
- // step_start payload
- if runEvents[idxStart].Payload["step_type"] != "LLM_*" {
- t.Errorf("step_start step_type: got %v, want 'LLM_*'", runEvents[idxStart].Payload["step_type"])
- }
- // step_done payload must include duration_ms and step_type
- donePayload := runEvents[idxDone].Payload
- if donePayload["duration_ms"] == nil {
- t.Error("step_done payload missing duration_ms")
- }
- if donePayload["step_type"] != "LLM_*" {
- t.Errorf("step_done step_type: got %v, want 'LLM_*'", donePayload["step_type"])
- }
- }
- // TestRunEventsStepSkipped verifies that step_skipped is emitted when if=false.
- func TestRunEventsStepSkipped(t *testing.T) {
- wf := &workflow.Workflow{
- Version: "3.10",
- Name: "Skipped Test",
- Registry: workflow.Registry{
- Services: []string{},
- Components: []string{},
- Vars: []string{"$flag(BOOLEAN)", "$result(STRING)"},
- Files: workflow.FilesRegistry{},
- },
- Steps: []workflow.Step{
- {
- ID: "LLM_Skipped",
- // $flag is initialized to boolean false via initialVars
- If: "=$flag",
- In: workflow.StepInput{
- "messages": []interface{}{map[string]interface{}{"role": "user", "content": "hi"}},
- },
- Out: workflow.StepOutput{"$result": "=_result"},
- Next: "Stop_End",
- },
- {ID: "Stop_End"},
- },
- }
- adapters := createTestAdapters()
- engine, err := workflow.NewEngine(wf)
- if err != nil {
- t.Fatalf("NewEngine: %v", err)
- }
- // Pass $flag as boolean false so the if condition fails
- result, err := engine.Execute(context.Background(), map[string]interface{}{"$flag": false}, adapters)
- if err != nil {
- t.Fatalf("Execute: %v", err)
- }
- var runEvents []workflow.RunEvent
- for ev := range result.RunEventStream {
- runEvents = append(runEvents, ev)
- }
- // Find step_skipped for LLM_Skipped
- found := false
- for _, ev := range runEvents {
- if ev.Type == workflow.RunEventStepSkipped && ev.StepID != nil && *ev.StepID == "LLM_Skipped" {
- found = true
- if ev.Payload["reason"] != "if_false" {
- t.Errorf("step_skipped reason: got %v, want 'if_false'", ev.Payload["reason"])
- }
- if ev.Payload["step_type"] != "LLM_*" {
- t.Errorf("step_skipped step_type: got %v, want 'LLM_*'", ev.Payload["step_type"])
- }
- }
- }
- if !found {
- t.Error("expected step_skipped RunEvent for LLM_Skipped, not found")
- }
- // No step_start or step_done should exist for LLM_Skipped
- for _, ev := range runEvents {
- if ev.StepID != nil && *ev.StepID == "LLM_Skipped" {
- if ev.Type == workflow.RunEventStepStart || ev.Type == workflow.RunEventStepDone {
- t.Errorf("unexpected %q event for skipped step LLM_Skipped", ev.Type)
- }
- }
- }
- }
- // TestRunEventsWorkflowFailed verifies that workflow_failed is emitted with error info.
- func TestRunEventsWorkflowFailed(t *testing.T) {
- wf := &workflow.Workflow{
- Version: "3.10",
- Name: "Failed Test",
- Registry: workflow.Registry{
- Services: []string{},
- Components: []string{},
- Vars: []string{"$answer(STRING)"},
- Files: workflow.FilesRegistry{},
- },
- Steps: []workflow.Step{
- {
- ID: "LLM_Fail",
- In: workflow.StepInput{
- "messages": []interface{}{map[string]interface{}{"role": "user", "content": "hi"}},
- },
- Out: workflow.StepOutput{"$answer": "=_result"},
- Next: "Stop_End",
- },
- {ID: "Stop_End"},
- },
- }
- adapters := createTestAdapters()
- llm := adapters.LLM.(*workflow.DefaultLLMAdapter)
- llm.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
- return nil, &workflow.LLMError{
- Type: "rate_limit",
- Code: "RATE_LIMIT",
- Message: "Rate limit exceeded",
- Retryable: true,
- StatusCode: 429,
- }
- })
- engine, err := workflow.NewEngine(wf)
- if err != nil {
- t.Fatalf("NewEngine: %v", err)
- }
- result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters)
- if err != nil {
- t.Fatalf("Execute: %v", err)
- }
- var runEvents []workflow.RunEvent
- for ev := range result.RunEventStream {
- runEvents = append(runEvents, ev)
- }
- found := false
- for _, ev := range runEvents {
- if ev.Type == workflow.RunEventWorkflowFailed {
- found = true
- if ev.StepID != nil {
- t.Error("workflow_failed step_id must be null")
- }
- if ev.Payload["failed_step_id"] != "LLM_Fail" {
- t.Errorf("workflow_failed failed_step_id: got %v, want 'LLM_Fail'", ev.Payload["failed_step_id"])
- }
- errMap, ok := ev.Payload["error"].(map[string]interface{})
- if !ok {
- t.Error("workflow_failed payload missing error object")
- } else if errMap["type"] != "rate_limit" {
- t.Errorf("workflow_failed error.type: got %v, want 'rate_limit'", errMap["type"])
- }
- if ev.Payload["duration_ms"] == nil {
- t.Error("workflow_failed payload missing duration_ms")
- }
- }
- }
- if !found {
- t.Error("expected workflow_failed RunEvent, not found")
- }
- }
|