package workflow_test import ( "context" "sync" "testing" "time" "workflow" ) // ── helpers ─────────────────────────────────────────────────────────────────── // v315Registry returns a minimal registry suitable for v3.15 pause tests. func v315Registry() workflow.Registry { return workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{"$result(ANY)"}, } } // v315Adapters returns a minimal Adapters struct with no real adapters. func v315Adapters() *workflow.Adapters { return &workflow.Adapters{ Service: workflow.NewDefaultServiceAdapter(), Component: workflow.NewDefaultComponentAdapter(), LLM: workflow.NewDefaultLLMAdapter(), } } func mustEngineV315(t *testing.T, wf *workflow.Workflow) *workflow.Engine { t.Helper() eng, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("NewEngine: %v", err) } return eng } // collectEvents drains the event stream into a slice (for assertion). func collectEvents(stream <-chan workflow.RunEvent) []workflow.RunEvent { var events []workflow.RunEvent for ev := range stream { events = append(events, ev) } return events } // findEvent returns the first event of the given type, or nil. func findEvent(events []workflow.RunEvent, t workflow.RunEventType) *workflow.RunEvent { for i := range events { if events[i].Type == t { return &events[i] } } return nil } // waitForStatus blocks until execCtx.Status equals want or timeout fires. func waitForStatus(execCtx *workflow.ExecutionContext, want workflow.ExecutionStatus, timeout time.Duration) bool { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { if execCtx.Status == want { return true } time.Sleep(5 * time.Millisecond) } return false } // pauseWorkflow returns a minimal workflow that pauses and then stops. func pauseWorkflow(version, resumeResultTarget string) *workflow.Workflow { return &workflow.Workflow{ Version: version, Name: "Pause Test", Registry: v315Registry(), Steps: []workflow.Step{ { ID: "Pause_Wait", ResumeResultTarget: resumeResultTarget, Next: "Stop_Done", }, {ID: "Stop_Done"}, }, } } // extractToken reads the waitToken from the pause_start event payload. func extractToken(t *testing.T, events []workflow.RunEvent) string { t.Helper() ev := findEvent(events, workflow.RunEventPauseStart) if ev == nil { t.Fatal("no pause_start event found") } token, ok := ev.Payload["waitToken"].(string) if !ok || token == "" { t.Fatalf("pause_start event missing waitToken: %v", ev.Payload) } return token } // ── v3.15 version acceptance ────────────────────────────────────────────────── func TestV315_VersionAccepted(t *testing.T) { wf := pauseWorkflow("3.15", "$result") _, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("expected version 3.15 to be accepted, got: %v", err) } } // ── Basic pause + resume ─────────────────────────────────────────────────────── func TestV315_BasicPauseResume(t *testing.T) { wf := pauseWorkflow("3.15", "$result") eng := mustEngineV315(t, wf) result, err := eng.Execute(context.Background(), nil, v315Adapters()) if err != nil { t.Fatalf("Execute: %v", err) } // Wait for workflow to reach paused state. if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) { t.Fatalf("workflow did not reach paused state; status=%s", result.Context.Status) } // Collect events so far by reading without blocking the goroutine. // We'll drain fully after resume. var mu sync.Mutex var allEvents []workflow.RunEvent done := make(chan struct{}) go func() { for ev := range result.RunEventStream { mu.Lock() allEvents = append(allEvents, ev) mu.Unlock() } close(done) }() // Extract token from events published so far. Give the pause_start event // a little time to be emitted before we sample allEvents. time.Sleep(20 * time.Millisecond) mu.Lock() snapshot := make([]workflow.RunEvent, len(allEvents)) copy(snapshot, allEvents) mu.Unlock() token := extractToken(t, snapshot) // Resume the workflow with a payload. payload := map[string]interface{}{"approved": true, "comment": "LGTM"} resumeErr := eng.Resume(result.Context, workflow.ResumeRequest{ Token: token, Payload: payload, RequestID: "req-001", }) if resumeErr != nil { t.Fatalf("Resume: %v", resumeErr) } // Wait for workflow to complete. <-done // Assert final status. if result.Context.Status != workflow.StatusStopped { t.Errorf("expected StatusStopped, got %s", result.Context.Status) } // Assert $result variable was written. gotResult, ok := result.Context.Variables["$result"] if !ok { t.Fatal("$result variable not set after resume") } gotMap, ok := gotResult.(map[string]interface{}) if !ok { t.Fatalf("$result is %T, expected map", gotResult) } if gotMap["approved"] != true { t.Errorf("$result.approved = %v, want true", gotMap["approved"]) } // Assert event sequence: pause_start → pause_resumed → step_done. mu.Lock() events := allEvents mu.Unlock() if findEvent(events, workflow.RunEventPauseStart) == nil { t.Error("missing pause_start event") } if findEvent(events, workflow.RunEventPauseResumed) == nil { t.Error("missing pause_resumed event") } if findEvent(events, workflow.RunEventWorkflowDone) == nil { t.Error("missing workflow_done event") } } // ── Validation: children not allowed on Pause_* ─────────────────────────────── func TestV315_ValidationChildrenRejected(t *testing.T) { wf := &workflow.Workflow{ Version: "3.15", Name: "Bad Children", Registry: v315Registry(), Steps: []workflow.Step{ { ID: "Pause_WithChildren", ResumeResultTarget: "$result", Children: []string{"Stop_Done"}, Next: "Stop_Done", }, {ID: "Stop_Done"}, }, } _, err := workflow.NewEngine(wf) if err == nil { t.Fatal("expected validation error for children on Pause_* step, got nil") } } // ── RunID validation in Resume ──────────────────────────────────────────────── func TestV315_ResumeRunIDMismatch(t *testing.T) { wf := pauseWorkflow("3.15", "$result") eng := mustEngineV315(t, wf) result, err := eng.Execute(context.Background(), nil, v315Adapters()) if err != nil { t.Fatalf("Execute: %v", err) } if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) { t.Fatal("workflow did not reach paused state") } var mu sync.Mutex var allEvents []workflow.RunEvent done := make(chan struct{}) go func() { for ev := range result.RunEventStream { mu.Lock() allEvents = append(allEvents, ev) mu.Unlock() } close(done) }() time.Sleep(20 * time.Millisecond) mu.Lock() snapshot := make([]workflow.RunEvent, len(allEvents)) copy(snapshot, allEvents) mu.Unlock() token := extractToken(t, snapshot) // Wrong RunID must be rejected. err2 := eng.Resume(result.Context, workflow.ResumeRequest{ RunID: "wrong-run-id", Token: token, }) if err2 == nil { t.Fatal("expected error for mismatched RunID, got nil") } // Correct RunID must succeed. err3 := eng.Resume(result.Context, workflow.ResumeRequest{ RunID: result.Context.WorkflowID, Token: token, }) if err3 != nil { t.Fatalf("Resume with correct RunID failed: %v", err3) } <-done } // ── Pause with reason field ──────────────────────────────────────────────────── func TestV315_PauseReasonField(t *testing.T) { wf := &workflow.Workflow{ Version: "3.15", Name: "Pause Reason Test", Registry: v315Registry(), Steps: []workflow.Step{ { ID: "Pause_WithReason", Reason: "请补充收货地址", ResumeResultTarget: "$result", Next: "Stop_Done", }, {ID: "Stop_Done"}, }, } eng := mustEngineV315(t, wf) result, err := eng.Execute(context.Background(), nil, v315Adapters()) if err != nil { t.Fatalf("Execute: %v", err) } if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) { t.Fatal("workflow did not reach paused state") } // Collect events asynchronously. var mu sync.Mutex var allEvents []workflow.RunEvent done := make(chan struct{}) go func() { for ev := range result.RunEventStream { mu.Lock() allEvents = append(allEvents, ev) mu.Unlock() } close(done) }() time.Sleep(20 * time.Millisecond) mu.Lock() snapshot := make([]workflow.RunEvent, len(allEvents)) copy(snapshot, allEvents) mu.Unlock() token := extractToken(t, snapshot) // Verify pause_start payload contains reason. pauseEv := findEvent(snapshot, workflow.RunEventPauseStart) if pauseEv == nil { t.Fatal("no pause_start event") } if reason, ok := pauseEv.Payload["reason"].(string); !ok || reason != "请补充收货地址" { t.Errorf("pause_start.reason = %v, want '请补充收货地址'", pauseEv.Payload["reason"]) } // Resume and wait. eng.Resume(result.Context, workflow.ResumeRequest{Token: token, Payload: "addr123"}) <-done } // ── Pause timeout ────────────────────────────────────────────────────────────── func TestV315_PauseTimeout(t *testing.T) { wf := &workflow.Workflow{ Version: "3.15", Name: "Pause Timeout Test", Registry: v315Registry(), Steps: []workflow.Step{ { ID: "Pause_Short", ResumeResultTarget: "$result", Timeout: &workflow.PauseTimeout{ Sec: 1, // 1 second timeout On: "Stop_TimedOut", }, Next: "Stop_Done", }, {ID: "Stop_TimedOut"}, {ID: "Stop_Done"}, }, } eng := mustEngineV315(t, wf) result, err := eng.Execute(context.Background(), nil, v315Adapters()) if err != nil { t.Fatalf("Execute: %v", err) } // Drain events; timeout should fire within ~1s. events := collectEvents(result.RunEventStream) if result.Context.Status != workflow.StatusStopped { t.Errorf("expected StatusStopped after timeout, got %s", result.Context.Status) } // pause_start and pause_timeout must be present; pause_resumed must be absent. if findEvent(events, workflow.RunEventPauseStart) == nil { t.Error("missing pause_start event") } timeoutEv := findEvent(events, workflow.RunEventPauseTimeout) if timeoutEv == nil { t.Error("missing pause_timeout event") } else { if timeoutEv.Payload["timeoutAction"] != "Stop_TimedOut" { t.Errorf("timeoutAction = %v, want Stop_TimedOut", timeoutEv.Payload["timeoutAction"]) } } if findEvent(events, workflow.RunEventPauseResumed) != nil { t.Error("unexpected pause_resumed event on timeout path") } } // ── Idempotent resume ───────────────────────────────────────────────────────── func TestV315_PauseIdempotentResume(t *testing.T) { wf := pauseWorkflow("3.15", "$result") eng := mustEngineV315(t, wf) result, err := eng.Execute(context.Background(), nil, v315Adapters()) if err != nil { t.Fatalf("Execute: %v", err) } if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) { t.Fatal("workflow did not reach paused state") } var mu sync.Mutex var allEvents []workflow.RunEvent done := make(chan struct{}) go func() { for ev := range result.RunEventStream { mu.Lock() allEvents = append(allEvents, ev) mu.Unlock() } close(done) }() time.Sleep(20 * time.Millisecond) mu.Lock() snapshot := make([]workflow.RunEvent, len(allEvents)) copy(snapshot, allEvents) mu.Unlock() token := extractToken(t, snapshot) // Call Resume twice with the same RequestID. req := workflow.ResumeRequest{Token: token, Payload: "data", RequestID: "idem-001"} if err := eng.Resume(result.Context, req); err != nil { t.Fatalf("first Resume: %v", err) } // Second call with same RequestID must be a silent no-op (nil error). if err := eng.Resume(result.Context, req); err != nil { t.Fatalf("idempotent Resume returned error: %v", err) } <-done // Only one pause_resumed event should appear. mu.Lock() events := allEvents mu.Unlock() count := 0 for _, ev := range events { if ev.Type == workflow.RunEventPauseResumed { count++ } } if count != 1 { t.Errorf("expected exactly 1 pause_resumed event, got %d", count) } } // ── Invalid token ───────────────────────────────────────────────────────────── func TestV315_PauseInvalidToken(t *testing.T) { wf := pauseWorkflow("3.15", "$result") eng := mustEngineV315(t, wf) result, err := eng.Execute(context.Background(), nil, v315Adapters()) if err != nil { t.Fatalf("Execute: %v", err) } if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) { t.Fatal("workflow did not reach paused state") } // Resume with wrong token — must return an error. resumeErr := eng.Resume(result.Context, workflow.ResumeRequest{ Token: "wrong-token", Payload: nil, RequestID: "req-bad", }) if resumeErr == nil { t.Fatal("expected error for invalid token, got nil") } // Workflow should still be paused. if result.Context.Status != workflow.StatusPaused { t.Errorf("expected StatusPaused after rejected resume, got %s", result.Context.Status) } // Drain the event stream by cancelling via context; collect remaining events. // (We need to unblock the goroutine to avoid a goroutine leak in the test.) // Re-execute with a context cancel to clean up. ctx2, cancel := context.WithCancel(context.Background()) wf2 := pauseWorkflow("3.15", "$result") eng2 := mustEngineV315(t, wf2) result2, _ := eng2.Execute(ctx2, nil, v315Adapters()) if !waitForStatus(result2.Context, workflow.StatusPaused, 2*time.Second) { t.Fatal("cleanup: workflow did not pause") } // Cancel to unblock. cancel() collectEvents(result2.RunEventStream) // For the original paused workflow, collect via a short-timeout cancel. ctxOrig, cancelOrig := context.WithCancel(context.Background()) _ = ctxOrig cancelOrig() // We can't easily cancel the original; just verify pause_rejected event was emitted. // The channel still has events buffered; drain a few. drainLoop := true evBuf := make([]workflow.RunEvent, 0, 10) for drainLoop { select { case ev, ok := <-result.RunEventStream: if !ok { drainLoop = false } else { evBuf = append(evBuf, ev) } default: drainLoop = false } } if findEvent(evBuf, workflow.RunEventPauseRejected) == nil { // pause_rejected might have been emitted before we started draining; // check it was at least not missing from the total stream so far. // This is a best-effort check in this test. t.Log("note: pause_rejected event not found in recently drained events (may have been buffered earlier)") } } // ── Resume when not paused ──────────────────────────────────────────────────── func TestV315_ResumeWhenNotPaused(t *testing.T) { wf := &workflow.Workflow{ Version: "3.15", Name: "No Pause", Registry: v315Registry(), Steps: []workflow.Step{ {ID: "Noop_Start", Next: "Stop_Done"}, {ID: "Stop_Done"}, }, } eng := mustEngineV315(t, wf) result, err := eng.Execute(context.Background(), nil, v315Adapters()) if err != nil { t.Fatalf("Execute: %v", err) } collectEvents(result.RunEventStream) // Workflow has completed; Resume must return an error. resumeErr := eng.Resume(result.Context, workflow.ResumeRequest{Token: "any", Payload: nil}) if resumeErr == nil { t.Fatal("expected error when resuming a non-paused workflow, got nil") } } // ── Validation: missing resumeResultTarget ──────────────────────────────────── func TestV315_ValidationMissingResumeResultTarget(t *testing.T) { wf := &workflow.Workflow{ Version: "3.15", Name: "Bad Pause", Registry: v315Registry(), Steps: []workflow.Step{ { ID: "Pause_Bad", // ResumeResultTarget intentionally omitted Next: "Stop_Done", }, {ID: "Stop_Done"}, }, } _, err := workflow.NewEngine(wf) if err == nil { t.Fatal("expected validation error for missing resumeResultTarget, got nil") } } // ── Validation: timeout.sec must be > 0 ────────────────────────────────────── func TestV315_ValidationTimeoutSecZero(t *testing.T) { wf := &workflow.Workflow{ Version: "3.15", Name: "Bad Timeout", Registry: v315Registry(), Steps: []workflow.Step{ { ID: "Pause_BadTimeout", ResumeResultTarget: "$result", Timeout: &workflow.PauseTimeout{ Sec: 0, // invalid On: "Stop_Done", }, Next: "Stop_Done", }, {ID: "Stop_Done"}, }, } _, err := workflow.NewEngine(wf) if err == nil { t.Fatal("expected validation error for timeout.sec == 0, got nil") } } // ── Validation: timeout.on must be non-empty ────────────────────────────────── func TestV315_ValidationTimeoutOnEmpty(t *testing.T) { wf := &workflow.Workflow{ Version: "3.15", Name: "Bad TimeoutOn", Registry: v315Registry(), Steps: []workflow.Step{ { ID: "Pause_NoOn", ResumeResultTarget: "$result", Timeout: &workflow.PauseTimeout{ Sec: 60, On: "", // invalid }, Next: "Stop_Done", }, {ID: "Stop_Done"}, }, } _, err := workflow.NewEngine(wf) if err == nil { t.Fatal("expected validation error for empty timeout.on, got nil") } } // ── Resume payload written to nested $vars path ─────────────────────────────── func TestV315_PauseNestedResumeResultTarget(t *testing.T) { wf := &workflow.Workflow{ Version: "3.15", Name: "Nested Target Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{"$approval(ANY)"}, }, Steps: []workflow.Step{ { ID: "Pause_Approval", ResumeResultTarget: "$approval", Next: "Stop_Done", }, {ID: "Stop_Done"}, }, } eng := mustEngineV315(t, wf) result, err := eng.Execute(context.Background(), nil, v315Adapters()) if err != nil { t.Fatalf("Execute: %v", err) } if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) { t.Fatal("workflow did not reach paused state") } var mu sync.Mutex var allEvents []workflow.RunEvent done := make(chan struct{}) go func() { for ev := range result.RunEventStream { mu.Lock() allEvents = append(allEvents, ev) mu.Unlock() } close(done) }() time.Sleep(20 * time.Millisecond) mu.Lock() snapshot := make([]workflow.RunEvent, len(allEvents)) copy(snapshot, allEvents) mu.Unlock() token := extractToken(t, snapshot) approvalPayload := map[string]interface{}{"approved": true, "stage": "L1"} if err := eng.Resume(result.Context, workflow.ResumeRequest{ Token: token, Payload: approvalPayload, RequestID: "req-l1", }); err != nil { t.Fatalf("Resume: %v", err) } <-done got, ok := result.Context.Variables["$approval"] if !ok { t.Fatal("$approval not set after resume") } gotMap, ok := got.(map[string]interface{}) if !ok { t.Fatalf("$approval is %T, expected map", got) } if gotMap["stage"] != "L1" { t.Errorf("$approval.stage = %v, want L1", gotMap["stage"]) } } // ── Context cancellation unblocks pause ─────────────────────────────────────── func TestV315_PauseCancelledByContext(t *testing.T) { wf := pauseWorkflow("3.15", "$result") eng := mustEngineV315(t, wf) ctx, cancel := context.WithCancel(context.Background()) result, err := eng.Execute(ctx, nil, v315Adapters()) if err != nil { t.Fatalf("Execute: %v", err) } if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) { t.Fatal("workflow did not reach paused state") } // Cancel the context; the workflow goroutine should unblock and fail. cancel() events := collectEvents(result.RunEventStream) if result.Context.Status != workflow.StatusFailed { t.Errorf("expected StatusFailed after ctx cancel, got %s", result.Context.Status) } if findEvent(events, workflow.RunEventWorkflowFailed) == nil { t.Error("missing workflow_failed event after ctx cancel") } } // ── Second resume after first succeeds is rejected ──────────────────────────── func TestV315_PauseDoubleResumeDifferentRequestID(t *testing.T) { wf := pauseWorkflow("3.15", "$result") eng := mustEngineV315(t, wf) result, err := eng.Execute(context.Background(), nil, v315Adapters()) if err != nil { t.Fatalf("Execute: %v", err) } if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) { t.Fatal("workflow did not reach paused state") } var mu sync.Mutex var allEvents []workflow.RunEvent done := make(chan struct{}) go func() { for ev := range result.RunEventStream { mu.Lock() allEvents = append(allEvents, ev) mu.Unlock() } close(done) }() time.Sleep(20 * time.Millisecond) mu.Lock() snapshot := make([]workflow.RunEvent, len(allEvents)) copy(snapshot, allEvents) mu.Unlock() token := extractToken(t, snapshot) // First resume — must succeed. if err := eng.Resume(result.Context, workflow.ResumeRequest{ Token: token, Payload: "first", RequestID: "req-A", }); err != nil { t.Fatalf("first Resume: %v", err) } <-done // Second resume (different RequestID) after workflow completed — must fail. err2 := eng.Resume(result.Context, workflow.ResumeRequest{ Token: token, Payload: "second", RequestID: "req-B", }) if err2 == nil { t.Fatal("expected error on second resume with different RequestID, got nil") } }