| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615 |
- package workflow_test
- // audit_fixes_test.go covers all P0 / P1 items identified in the external audit:
- //
- // P0-5 Branch_* no-match without ELSE → continue to Branch_*.next (not error)
- // P0-3 Validate: Stop_* must not have next or children
- // P0-2 Validate: reference integrity, entry≥1, reachability
- // P0-1 executeWorkflow: start from true entry nodes (not always steps[0])
- // P0-7 registry.params: name(TYPE) = defaultValue syntax + default application
- // P0-4 ExecuteWithRunParams + RunParams stored on context
- // P1-1 Resume on non-paused workflow → error; best-effort pause_rejected event
- import (
- "context"
- "strings"
- "testing"
- "time"
- "workflow"
- )
- // ── helpers ───────────────────────────────────────────────────────────────────
- func auditRegistry() workflow.Registry {
- return workflow.Registry{
- Services: []string{},
- Components: []string{},
- Vars: []string{"$out(ANY)"},
- }
- }
- func auditAdapters() *workflow.Adapters {
- return &workflow.Adapters{
- Service: workflow.NewDefaultServiceAdapter(),
- Component: workflow.NewDefaultComponentAdapter(),
- LLM: workflow.NewDefaultLLMAdapter(),
- }
- }
- func wantValidationError(t *testing.T, wf *workflow.Workflow, fragment string) {
- t.Helper()
- _, err := workflow.NewEngine(wf)
- if err == nil {
- t.Fatal("expected validation error, got nil")
- }
- if fragment != "" && !strings.Contains(err.Error(), fragment) {
- t.Fatalf("expected error containing %q, got: %v", fragment, err)
- }
- }
- // runAndDrain executes the workflow, drains the event stream, and returns the context.
- func runAndDrain(t *testing.T, eng *workflow.Engine, initialVars map[string]interface{}) *workflow.ExecutionContext {
- t.Helper()
- result, err := eng.Execute(context.Background(), initialVars, auditAdapters())
- if err != nil {
- t.Fatalf("Execute: %v", err)
- }
- for range result.RunEventStream {
- }
- return result.Context
- }
- // ── P0-5: Branch_* no-match without ELSE → continue to Branch_*.next ─────────
- // TestBranchNoMatchContinuesToNext verifies that when a Branch_* step has no
- // matching case and no ELSE clause, execution proceeds to Branch_*.next rather
- // than returning an error (spec §10.10).
- func TestBranchNoMatchContinuesToNext(t *testing.T) {
- reg := auditRegistry()
- reg.Params = []string{"score(INT)"}
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "BranchNoMatch",
- Registry: reg,
- Steps: []workflow.Step{
- {
- ID: "Branch_check",
- Next: "Set_done",
- Cases: [][]string{{"=score > 90", "Set_high"}},
- // No ELSE — if score ≤ 90, should skip to Branch_check.next
- },
- // Branch target (only executed if score > 90)
- {ID: "Set_high", Target: "$out", Value: "high", Next: "Set_done"},
- // Normal continuation after Branch_check
- {ID: "Set_done", Target: "$out", Value: "done", Next: "Stop_end"},
- {ID: "Stop_end"},
- },
- }
- eng, err := workflow.NewEngine(wf)
- if err != nil {
- t.Fatalf("NewEngine: %v", err)
- }
- ctx := runAndDrain(t, eng, map[string]interface{}{"score": int64(50)})
- // $out should be "done" (Set_done was reached; Set_high was NOT executed)
- if got := ctx.Variables["$out"]; got != "done" {
- t.Errorf("expected $out == 'done', got %v", got)
- }
- }
- // TestBranchElseFallback verifies that when ELSE is present, it is used
- // (regression guard for existing ELSE behaviour).
- func TestBranchElseFallback(t *testing.T) {
- reg := auditRegistry()
- reg.Params = []string{"score(INT)"}
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "BranchElse",
- Registry: reg,
- Steps: []workflow.Step{
- {
- ID: "Branch_check",
- Next: "Stop_end",
- Cases: [][]string{
- {"=score > 90", "Set_high"},
- {"ELSE", "Set_low"},
- },
- },
- {ID: "Set_high", Target: "$out", Value: "high", Next: "Stop_end"},
- {ID: "Set_low", Target: "$out", Value: "low", Next: "Stop_end"},
- {ID: "Stop_end"},
- },
- }
- eng, err := workflow.NewEngine(wf)
- if err != nil {
- t.Fatalf("NewEngine: %v", err)
- }
- ctx := runAndDrain(t, eng, map[string]interface{}{"score": int64(50)})
- if got := ctx.Variables["$out"]; got != "low" {
- t.Errorf("expected $out == 'low', got %v", got)
- }
- }
- // ── P0-3: Validate Stop_* constraints ────────────────────────────────────────
- func TestValidateStopWithNext(t *testing.T) {
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "StopWithNext",
- Registry: auditRegistry(),
- Steps: []workflow.Step{
- {ID: "Noop_start", Next: "Stop_end"},
- {ID: "Stop_end", Next: "Noop_start"}, // INVALID: Stop_* cannot have next
- },
- }
- wantValidationError(t, wf, "Stop_end")
- }
- func TestValidateStopWithChildren(t *testing.T) {
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "StopWithChildren",
- Registry: auditRegistry(),
- Steps: []workflow.Step{
- {ID: "Noop_start", Next: "Stop_end"},
- {ID: "Noop_child", Next: "RETURN"},
- {ID: "Stop_end", Children: []string{"Noop_child"}}, // INVALID
- },
- }
- wantValidationError(t, wf, "children")
- }
- // ── P0-2: Reference integrity ─────────────────────────────────────────────────
- func TestValidateDeadNextReference(t *testing.T) {
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "DeadNext",
- Registry: auditRegistry(),
- Steps: []workflow.Step{
- {ID: "Noop_start", Next: "Stop_nonexistent"}, // INVALID: step doesn't exist
- },
- }
- wantValidationError(t, wf, "Stop_nonexistent")
- }
- func TestValidateDeadChildReference(t *testing.T) {
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "DeadChild",
- Registry: auditRegistry(),
- Steps: []workflow.Step{
- {ID: "Noop_start", Children: []string{"Noop_ghost"}, Next: "Stop_end"},
- {ID: "Stop_end"},
- },
- }
- wantValidationError(t, wf, "Noop_ghost")
- }
- func TestValidateDeadCaseReference(t *testing.T) {
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "DeadCase",
- Registry: auditRegistry(),
- Steps: []workflow.Step{
- {ID: "Branch_check", Next: "Stop_end", Cases: [][]string{{"=1==1", "Set_ghost"}}},
- {ID: "Stop_end"},
- },
- }
- wantValidationError(t, wf, "Set_ghost")
- }
- func TestValidateDeadOnErrorReference(t *testing.T) {
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "DeadOnError",
- Registry: auditRegistry(),
- Steps: []workflow.Step{
- {ID: "Noop_start", Next: "Stop_end", OnError: "Stop_missing"},
- {ID: "Stop_end"},
- },
- }
- wantValidationError(t, wf, "Stop_missing")
- }
- // ── P0-2: Entry node and reachability ─────────────────────────────────────────
- // TestValidateNoEntryNode verifies that a self-referencing step (all steps
- // referenced) is rejected because there is no entry node.
- func TestValidateNoEntryNode(t *testing.T) {
- // Noop_selfref.next = Noop_selfref: the only step references itself,
- // so no step is "unreferenced" → no entry node.
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "NoEntry",
- Registry: auditRegistry(),
- Steps: []workflow.Step{
- {ID: "Noop_selfref", Next: "Noop_selfref"},
- },
- }
- wantValidationError(t, wf, "entry node")
- }
- // TestValidateUnreachableStep verifies that a disconnected cycle is rejected
- // because its steps are unreachable from the live graph's entry node.
- func TestValidateUnreachableStep(t *testing.T) {
- // Noop_dead1 → Noop_dead2 → Noop_dead1 forms a disconnected cycle.
- // Noop_dead1 is referenced by Noop_dead2 (not entry).
- // Noop_dead2 is referenced by Noop_dead1 (not entry).
- // Only Noop_start is the entry node. BFS from Noop_start reaches
- // only Stop_end. Noop_dead1 and Noop_dead2 are unreachable.
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "Unreachable",
- Registry: auditRegistry(),
- Steps: []workflow.Step{
- {ID: "Noop_start", Next: "Stop_end"},
- {ID: "Stop_end"},
- {ID: "Noop_dead1", Next: "Noop_dead2"},
- {ID: "Noop_dead2", Next: "Noop_dead1"},
- },
- }
- wantValidationError(t, wf, "unreachable")
- }
- // ── P0-1: Entry node detection ────────────────────────────────────────────────
- // TestEntryNodeNotFirstStep verifies that when steps[0] is referenced by
- // another step, the engine correctly starts from the true entry node.
- func TestEntryNodeNotFirstStep(t *testing.T) {
- // steps[0] = Set_middle (referenced by Set_entry.next → NOT the entry node)
- // steps[1] = Set_entry (not referenced → TRUE entry node)
- // steps[2] = Stop_end (referenced by Set_middle.next)
- //
- // Correct execution: Set_entry ("entry") → Set_middle ("middle") → Stop_end
- // Final $out = "middle" (Set_middle is the last write).
- // If the engine wrongly starts from steps[0]=Set_middle, $out would end up
- // as "middle" too — but $out would never be "entry" first.
- // To distinguish, verify execution path by checking $out == "middle" AND
- // that Set_entry DID run (its write "entry" was overwritten by "middle").
- reg := auditRegistry()
- reg.Vars = []string{"$out(ANY)", "$trace(ANY)"}
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "EntryNotFirst",
- Registry: reg,
- Steps: []workflow.Step{
- // steps[0]: referenced by Set_entry.next → NOT entry
- {ID: "Set_middle", Target: "$out", Value: "middle", Next: "Stop_end"},
- // steps[1]: not referenced → TRUE entry
- {ID: "Set_entry", Target: "$out", Value: "entry", Next: "Set_middle"},
- {ID: "Stop_end"},
- },
- }
- eng, err := workflow.NewEngine(wf)
- if err != nil {
- t.Fatalf("NewEngine: %v", err)
- }
- ctx := runAndDrain(t, eng, nil)
- // Set_entry writes "entry", then Set_middle overwrites with "middle".
- // Final value = "middle", confirming execution started from the true entry node.
- if got := ctx.Variables["$out"]; got != "middle" {
- t.Errorf("$out = %v, want 'middle' (true entry: Set_entry → Set_middle)", got)
- }
- }
- // ── P0-7: Registry param defaults ─────────────────────────────────────────────
- func TestParseParamDeclaration_INT_Default(t *testing.T) {
- decl, err := workflow.ParseParamDeclaration("maxRetries(INT) = 3")
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- if decl.Name != "maxRetries" {
- t.Errorf("Name = %q, want 'maxRetries'", decl.Name)
- }
- if decl.Type != "INT" {
- t.Errorf("Type = %q, want 'INT'", decl.Type)
- }
- if decl.Default == nil || *decl.Default != "3" {
- t.Errorf("Default = %v, want ptr to '3'", decl.Default)
- }
- }
- func TestParseParamDeclaration_BOOL_Default(t *testing.T) {
- decl, err := workflow.ParseParamDeclaration("enabled(BOOL) = true")
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- if decl.Default == nil || *decl.Default != "true" {
- t.Errorf("Default = %v, want ptr to 'true'", decl.Default)
- }
- }
- func TestParseParamDeclaration_STRING_Default(t *testing.T) {
- // String literals in declarations use surrounding double quotes.
- decl, err := workflow.ParseParamDeclaration(`prefix(STRING) = "hello"`)
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- // ParseParamDeclaration strips the surrounding double quotes.
- if decl.Default == nil || *decl.Default != "hello" {
- t.Errorf("Default = %v, want ptr to 'hello' (quotes stripped)", decl.Default)
- }
- }
- func TestParseParamDeclaration_NoDefault(t *testing.T) {
- decl, err := workflow.ParseParamDeclaration("userId(STRING)")
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- if decl.Default != nil {
- t.Errorf("Default should be nil, got %q", *decl.Default)
- }
- }
- func TestCoerceParamDefault_INT(t *testing.T) {
- raw := "5"
- decl := &workflow.ParamDeclaration{Name: "n", Type: "INT", Default: &raw}
- v, err := workflow.CoerceParamDefault(decl)
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- if v != int64(5) {
- t.Errorf("CoerceParamDefault = %v (%T), want int64(5)", v, v)
- }
- }
- func TestCoerceParamDefault_BOOL(t *testing.T) {
- raw := "false"
- decl := &workflow.ParamDeclaration{Name: "flag", Type: "BOOL", Default: &raw}
- v, err := workflow.CoerceParamDefault(decl)
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- if v != false {
- t.Errorf("CoerceParamDefault = %v, want false", v)
- }
- }
- func TestCoerceParamDefault_STRING(t *testing.T) {
- raw := "hello"
- decl := &workflow.ParamDeclaration{Name: "s", Type: "STRING", Default: &raw}
- v, err := workflow.CoerceParamDefault(decl)
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- if v != "hello" {
- t.Errorf("CoerceParamDefault = %v, want 'hello'", v)
- }
- }
- func TestCoerceParamDefault_Nil(t *testing.T) {
- decl := &workflow.ParamDeclaration{Name: "n", Type: "INT", Default: nil}
- v, err := workflow.CoerceParamDefault(decl)
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- if v != nil {
- t.Errorf("CoerceParamDefault = %v, want nil", v)
- }
- }
- // TestParamDefaultApplied verifies that when a registry param has a default and
- // the caller does NOT provide it, the coerced default is applied to execCtx.Params.
- func TestParamDefaultApplied(t *testing.T) {
- reg := workflow.Registry{
- Services: []string{},
- Components: []string{},
- Vars: []string{"$out(ANY)"},
- Params: []string{"retries(INT) = 3"},
- }
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "DefaultParam",
- Registry: reg,
- Steps: []workflow.Step{
- // Write the retries param value to $out so we can inspect it.
- {ID: "Set_out", Target: "$out", Value: "=retries", Next: "Stop_end"},
- {ID: "Stop_end"},
- },
- }
- eng, err := workflow.NewEngine(wf)
- if err != nil {
- t.Fatalf("NewEngine: %v", err)
- }
- ctx := runAndDrain(t, eng, nil /* no initialVars — default should apply */)
- // Default is int64(3)
- if got := ctx.Variables["$out"]; got != int64(3) {
- t.Errorf("$out = %v (%T), want int64(3) (default value applied)", got, got)
- }
- }
- // TestParamDefaultNotOverridden verifies that an explicit caller value wins.
- func TestParamDefaultNotOverridden(t *testing.T) {
- reg := workflow.Registry{
- Services: []string{},
- Components: []string{},
- Vars: []string{"$out(ANY)"},
- Params: []string{"retries(INT) = 3"},
- }
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "DefaultParamOverride",
- Registry: reg,
- Steps: []workflow.Step{
- {ID: "Set_out", Target: "$out", Value: "=retries", Next: "Stop_end"},
- {ID: "Stop_end"},
- },
- }
- eng, err := workflow.NewEngine(wf)
- if err != nil {
- t.Fatalf("NewEngine: %v", err)
- }
- ctx := runAndDrain(t, eng, map[string]interface{}{"retries": int64(10)})
- if got := ctx.Variables["$out"]; got != int64(10) {
- t.Errorf("$out = %v, want int64(10) (caller value must override default)", got)
- }
- }
- // ── P0-4: RunParams + ExecuteWithRunParams ────────────────────────────────────
- func TestExecuteWithRunParams_Stored(t *testing.T) {
- reg := auditRegistry()
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "RunParamsTest",
- Registry: reg,
- Steps: []workflow.Step{
- {ID: "Noop_start", Next: "Stop_end"},
- {ID: "Stop_end"},
- },
- }
- eng, err := workflow.NewEngine(wf)
- if err != nil {
- t.Fatalf("NewEngine: %v", err)
- }
- runParams := workflow.RunParams{
- WorkspaceID: "ws-42",
- Mode: "test",
- Nodes: []string{"Noop_start"},
- }
- result, execErr := eng.ExecuteWithRunParams(context.Background(), nil, auditAdapters(), runParams)
- if execErr != nil {
- t.Fatalf("ExecuteWithRunParams: %v", execErr)
- }
- for range result.RunEventStream {
- }
- rp := result.Context.RunParams
- if rp == nil {
- t.Fatal("RunParams is nil on context")
- }
- if rp.WorkspaceID != "ws-42" {
- t.Errorf("WorkspaceID = %q, want 'ws-42'", rp.WorkspaceID)
- }
- if rp.Mode != "test" {
- t.Errorf("Mode = %q, want 'test'", rp.Mode)
- }
- if len(rp.Nodes) != 1 || rp.Nodes[0] != "Noop_start" {
- t.Errorf("Nodes = %v, want ['Noop_start']", rp.Nodes)
- }
- }
- // ── P1-1: Resume on non-paused → error (spec §11.5.4) ─────────────────────────
- // TestResumeNotPausedReturnsError verifies that calling Resume when the workflow
- // is not in paused state (or has already completed) returns an error.
- // The implementation also emits a best-effort pause_rejected event (spec §11.5.4);
- // observability of that event is timing-dependent and not asserted here.
- func TestResumeNotPausedReturnsError(t *testing.T) {
- reg := auditRegistry()
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "NotPausedReject",
- Registry: reg,
- Steps: []workflow.Step{
- {ID: "Noop_start", Next: "Stop_end"},
- {ID: "Stop_end"},
- },
- }
- eng, err := workflow.NewEngine(wf)
- if err != nil {
- t.Fatalf("NewEngine: %v", err)
- }
- result, execErr := eng.Execute(context.Background(), nil, auditAdapters())
- if execErr != nil {
- t.Fatalf("Execute: %v", execErr)
- }
- // Drain stream (workflow completes) before calling Resume.
- for range result.RunEventStream {
- }
- resumeErr := eng.Resume(result.Context, workflow.ResumeRequest{
- Token: "any-token",
- RequestID: "req-001",
- })
- if resumeErr == nil {
- t.Fatal("expected error from Resume on non-paused workflow, got nil")
- }
- if !strings.Contains(resumeErr.Error(), "paused") {
- t.Errorf("error = %q, want it to mention 'paused'", resumeErr.Error())
- }
- }
- // TestResumeNotPausedEmitsPauseRejected verifies the pause_rejected event IS
- // emitted when Resume is called on a paused workflow with an invalid token.
- // The workflow has a short timeout so it terminates automatically without needing
- // the correct token. This ensures deterministic timing.
- func TestResumeNotPausedEmitsPauseRejected(t *testing.T) {
- reg := v315Registry()
- wf := &workflow.Workflow{
- Version: "3.15",
- Name: "PauseRejectedEvent",
- Registry: reg,
- Steps: []workflow.Step{
- {
- ID: "Pause_wait",
- ResumeResultTarget: "$result",
- Next: "Stop_end",
- // Short timeout so the workflow self-terminates after the bad Resume call.
- Timeout: &workflow.PauseTimeout{Sec: 2, On: "Stop_timeout"},
- },
- {ID: "Stop_end"},
- {ID: "Stop_timeout"},
- },
- }
- eng := mustEngineV315(t, wf)
- result, execErr := eng.Execute(context.Background(), nil, v315Adapters())
- if execErr != nil {
- t.Fatalf("Execute: %v", execErr)
- }
- // Collect ALL events in the background goroutine.
- evtCh := make(chan []workflow.RunEvent, 1)
- go func() {
- var evs []workflow.RunEvent
- for ev := range result.RunEventStream {
- evs = append(evs, ev)
- }
- evtCh <- evs
- }()
- // Wait for workflow to pause (stream still open, PauseState set).
- if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) {
- t.Fatal("workflow did not reach paused state in time")
- }
- // Call Resume with an invalid token → engine emits pause_rejected(invalid_token)
- // to the open stream, which the background goroutine will capture.
- badErr := eng.Resume(result.Context, workflow.ResumeRequest{
- Token: "wrong-token-xyz",
- RequestID: "req-bad-001",
- })
- if badErr == nil {
- t.Fatal("expected error for invalid token, got nil")
- }
- // Wait for the workflow to complete (it will self-terminate via the 2s timeout).
- allEvents := <-evtCh
- // Check that pause_rejected(invalid_token) was emitted.
- var rejected *workflow.RunEvent
- for i := range allEvents {
- if allEvents[i].Type == workflow.RunEventPauseRejected {
- rejected = &allEvents[i]
- break
- }
- }
- if rejected == nil {
- t.Fatal("expected pause_rejected event in stream, none found")
- }
- if rc, _ := rejected.Payload["reasonCode"].(string); rc != "invalid_token" {
- t.Errorf("reasonCode = %q, want 'invalid_token'", rc)
- }
- }
|