audit_fixes_test.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615
  1. package workflow_test
  2. // audit_fixes_test.go covers all P0 / P1 items identified in the external audit:
  3. //
  4. // P0-5 Branch_* no-match without ELSE → continue to Branch_*.next (not error)
  5. // P0-3 Validate: Stop_* must not have next or children
  6. // P0-2 Validate: reference integrity, entry≥1, reachability
  7. // P0-1 executeWorkflow: start from true entry nodes (not always steps[0])
  8. // P0-7 registry.params: name(TYPE) = defaultValue syntax + default application
  9. // P0-4 ExecuteWithRunParams + RunParams stored on context
  10. // P1-1 Resume on non-paused workflow → error; best-effort pause_rejected event
  11. import (
  12. "context"
  13. "strings"
  14. "testing"
  15. "time"
  16. "workflow"
  17. )
  18. // ── helpers ───────────────────────────────────────────────────────────────────
  19. func auditRegistry() workflow.Registry {
  20. return workflow.Registry{
  21. Services: []string{},
  22. Components: []string{},
  23. Vars: []string{"$out(ANY)"},
  24. }
  25. }
  26. func auditAdapters() *workflow.Adapters {
  27. return &workflow.Adapters{
  28. Service: workflow.NewDefaultServiceAdapter(),
  29. Component: workflow.NewDefaultComponentAdapter(),
  30. LLM: workflow.NewDefaultLLMAdapter(),
  31. }
  32. }
  33. func wantValidationError(t *testing.T, wf *workflow.Workflow, fragment string) {
  34. t.Helper()
  35. _, err := workflow.NewEngine(wf)
  36. if err == nil {
  37. t.Fatal("expected validation error, got nil")
  38. }
  39. if fragment != "" && !strings.Contains(err.Error(), fragment) {
  40. t.Fatalf("expected error containing %q, got: %v", fragment, err)
  41. }
  42. }
  43. // runAndDrain executes the workflow, drains the event stream, and returns the context.
  44. func runAndDrain(t *testing.T, eng *workflow.Engine, initialVars map[string]interface{}) *workflow.ExecutionContext {
  45. t.Helper()
  46. result, err := eng.Execute(context.Background(), initialVars, auditAdapters())
  47. if err != nil {
  48. t.Fatalf("Execute: %v", err)
  49. }
  50. for range result.RunEventStream {
  51. }
  52. return result.Context
  53. }
  54. // ── P0-5: Branch_* no-match without ELSE → continue to Branch_*.next ─────────
  55. // TestBranchNoMatchContinuesToNext verifies that when a Branch_* step has no
  56. // matching case and no ELSE clause, execution proceeds to Branch_*.next rather
  57. // than returning an error (spec §10.10).
  58. func TestBranchNoMatchContinuesToNext(t *testing.T) {
  59. reg := auditRegistry()
  60. reg.Params = []string{"score(INT)"}
  61. wf := &workflow.Workflow{
  62. Version: "3.15",
  63. Name: "BranchNoMatch",
  64. Registry: reg,
  65. Steps: []workflow.Step{
  66. {
  67. ID: "Branch_check",
  68. Next: "Set_done",
  69. Cases: [][]string{{"=score > 90", "Set_high"}},
  70. // No ELSE — if score ≤ 90, should skip to Branch_check.next
  71. },
  72. // Branch target (only executed if score > 90)
  73. {ID: "Set_high", Target: "$out", Value: "high", Next: "Set_done"},
  74. // Normal continuation after Branch_check
  75. {ID: "Set_done", Target: "$out", Value: "done", Next: "Stop_end"},
  76. {ID: "Stop_end"},
  77. },
  78. }
  79. eng, err := workflow.NewEngine(wf)
  80. if err != nil {
  81. t.Fatalf("NewEngine: %v", err)
  82. }
  83. ctx := runAndDrain(t, eng, map[string]interface{}{"score": int64(50)})
  84. // $out should be "done" (Set_done was reached; Set_high was NOT executed)
  85. if got := ctx.Variables["$out"]; got != "done" {
  86. t.Errorf("expected $out == 'done', got %v", got)
  87. }
  88. }
  89. // TestBranchElseFallback verifies that when ELSE is present, it is used
  90. // (regression guard for existing ELSE behaviour).
  91. func TestBranchElseFallback(t *testing.T) {
  92. reg := auditRegistry()
  93. reg.Params = []string{"score(INT)"}
  94. wf := &workflow.Workflow{
  95. Version: "3.15",
  96. Name: "BranchElse",
  97. Registry: reg,
  98. Steps: []workflow.Step{
  99. {
  100. ID: "Branch_check",
  101. Next: "Stop_end",
  102. Cases: [][]string{
  103. {"=score > 90", "Set_high"},
  104. {"ELSE", "Set_low"},
  105. },
  106. },
  107. {ID: "Set_high", Target: "$out", Value: "high", Next: "Stop_end"},
  108. {ID: "Set_low", Target: "$out", Value: "low", Next: "Stop_end"},
  109. {ID: "Stop_end"},
  110. },
  111. }
  112. eng, err := workflow.NewEngine(wf)
  113. if err != nil {
  114. t.Fatalf("NewEngine: %v", err)
  115. }
  116. ctx := runAndDrain(t, eng, map[string]interface{}{"score": int64(50)})
  117. if got := ctx.Variables["$out"]; got != "low" {
  118. t.Errorf("expected $out == 'low', got %v", got)
  119. }
  120. }
  121. // ── P0-3: Validate Stop_* constraints ────────────────────────────────────────
  122. func TestValidateStopWithNext(t *testing.T) {
  123. wf := &workflow.Workflow{
  124. Version: "3.15",
  125. Name: "StopWithNext",
  126. Registry: auditRegistry(),
  127. Steps: []workflow.Step{
  128. {ID: "Noop_start", Next: "Stop_end"},
  129. {ID: "Stop_end", Next: "Noop_start"}, // INVALID: Stop_* cannot have next
  130. },
  131. }
  132. wantValidationError(t, wf, "Stop_end")
  133. }
  134. func TestValidateStopWithChildren(t *testing.T) {
  135. wf := &workflow.Workflow{
  136. Version: "3.15",
  137. Name: "StopWithChildren",
  138. Registry: auditRegistry(),
  139. Steps: []workflow.Step{
  140. {ID: "Noop_start", Next: "Stop_end"},
  141. {ID: "Noop_child", Next: "RETURN"},
  142. {ID: "Stop_end", Children: []string{"Noop_child"}}, // INVALID
  143. },
  144. }
  145. wantValidationError(t, wf, "children")
  146. }
  147. // ── P0-2: Reference integrity ─────────────────────────────────────────────────
  148. func TestValidateDeadNextReference(t *testing.T) {
  149. wf := &workflow.Workflow{
  150. Version: "3.15",
  151. Name: "DeadNext",
  152. Registry: auditRegistry(),
  153. Steps: []workflow.Step{
  154. {ID: "Noop_start", Next: "Stop_nonexistent"}, // INVALID: step doesn't exist
  155. },
  156. }
  157. wantValidationError(t, wf, "Stop_nonexistent")
  158. }
  159. func TestValidateDeadChildReference(t *testing.T) {
  160. wf := &workflow.Workflow{
  161. Version: "3.15",
  162. Name: "DeadChild",
  163. Registry: auditRegistry(),
  164. Steps: []workflow.Step{
  165. {ID: "Noop_start", Children: []string{"Noop_ghost"}, Next: "Stop_end"},
  166. {ID: "Stop_end"},
  167. },
  168. }
  169. wantValidationError(t, wf, "Noop_ghost")
  170. }
  171. func TestValidateDeadCaseReference(t *testing.T) {
  172. wf := &workflow.Workflow{
  173. Version: "3.15",
  174. Name: "DeadCase",
  175. Registry: auditRegistry(),
  176. Steps: []workflow.Step{
  177. {ID: "Branch_check", Next: "Stop_end", Cases: [][]string{{"=1==1", "Set_ghost"}}},
  178. {ID: "Stop_end"},
  179. },
  180. }
  181. wantValidationError(t, wf, "Set_ghost")
  182. }
  183. func TestValidateDeadOnErrorReference(t *testing.T) {
  184. wf := &workflow.Workflow{
  185. Version: "3.15",
  186. Name: "DeadOnError",
  187. Registry: auditRegistry(),
  188. Steps: []workflow.Step{
  189. {ID: "Noop_start", Next: "Stop_end", OnError: "Stop_missing"},
  190. {ID: "Stop_end"},
  191. },
  192. }
  193. wantValidationError(t, wf, "Stop_missing")
  194. }
  195. // ── P0-2: Entry node and reachability ─────────────────────────────────────────
  196. // TestValidateNoEntryNode verifies that a self-referencing step (all steps
  197. // referenced) is rejected because there is no entry node.
  198. func TestValidateNoEntryNode(t *testing.T) {
  199. // Noop_selfref.next = Noop_selfref: the only step references itself,
  200. // so no step is "unreferenced" → no entry node.
  201. wf := &workflow.Workflow{
  202. Version: "3.15",
  203. Name: "NoEntry",
  204. Registry: auditRegistry(),
  205. Steps: []workflow.Step{
  206. {ID: "Noop_selfref", Next: "Noop_selfref"},
  207. },
  208. }
  209. wantValidationError(t, wf, "entry node")
  210. }
  211. // TestValidateUnreachableStep verifies that a disconnected cycle is rejected
  212. // because its steps are unreachable from the live graph's entry node.
  213. func TestValidateUnreachableStep(t *testing.T) {
  214. // Noop_dead1 → Noop_dead2 → Noop_dead1 forms a disconnected cycle.
  215. // Noop_dead1 is referenced by Noop_dead2 (not entry).
  216. // Noop_dead2 is referenced by Noop_dead1 (not entry).
  217. // Only Noop_start is the entry node. BFS from Noop_start reaches
  218. // only Stop_end. Noop_dead1 and Noop_dead2 are unreachable.
  219. wf := &workflow.Workflow{
  220. Version: "3.15",
  221. Name: "Unreachable",
  222. Registry: auditRegistry(),
  223. Steps: []workflow.Step{
  224. {ID: "Noop_start", Next: "Stop_end"},
  225. {ID: "Stop_end"},
  226. {ID: "Noop_dead1", Next: "Noop_dead2"},
  227. {ID: "Noop_dead2", Next: "Noop_dead1"},
  228. },
  229. }
  230. wantValidationError(t, wf, "unreachable")
  231. }
  232. // ── P0-1: Entry node detection ────────────────────────────────────────────────
  233. // TestEntryNodeNotFirstStep verifies that when steps[0] is referenced by
  234. // another step, the engine correctly starts from the true entry node.
  235. func TestEntryNodeNotFirstStep(t *testing.T) {
  236. // steps[0] = Set_middle (referenced by Set_entry.next → NOT the entry node)
  237. // steps[1] = Set_entry (not referenced → TRUE entry node)
  238. // steps[2] = Stop_end (referenced by Set_middle.next)
  239. //
  240. // Correct execution: Set_entry ("entry") → Set_middle ("middle") → Stop_end
  241. // Final $out = "middle" (Set_middle is the last write).
  242. // If the engine wrongly starts from steps[0]=Set_middle, $out would end up
  243. // as "middle" too — but $out would never be "entry" first.
  244. // To distinguish, verify execution path by checking $out == "middle" AND
  245. // that Set_entry DID run (its write "entry" was overwritten by "middle").
  246. reg := auditRegistry()
  247. reg.Vars = []string{"$out(ANY)", "$trace(ANY)"}
  248. wf := &workflow.Workflow{
  249. Version: "3.15",
  250. Name: "EntryNotFirst",
  251. Registry: reg,
  252. Steps: []workflow.Step{
  253. // steps[0]: referenced by Set_entry.next → NOT entry
  254. {ID: "Set_middle", Target: "$out", Value: "middle", Next: "Stop_end"},
  255. // steps[1]: not referenced → TRUE entry
  256. {ID: "Set_entry", Target: "$out", Value: "entry", Next: "Set_middle"},
  257. {ID: "Stop_end"},
  258. },
  259. }
  260. eng, err := workflow.NewEngine(wf)
  261. if err != nil {
  262. t.Fatalf("NewEngine: %v", err)
  263. }
  264. ctx := runAndDrain(t, eng, nil)
  265. // Set_entry writes "entry", then Set_middle overwrites with "middle".
  266. // Final value = "middle", confirming execution started from the true entry node.
  267. if got := ctx.Variables["$out"]; got != "middle" {
  268. t.Errorf("$out = %v, want 'middle' (true entry: Set_entry → Set_middle)", got)
  269. }
  270. }
  271. // ── P0-7: Registry param defaults ─────────────────────────────────────────────
  272. func TestParseParamDeclaration_INT_Default(t *testing.T) {
  273. decl, err := workflow.ParseParamDeclaration("maxRetries(INT) = 3")
  274. if err != nil {
  275. t.Fatalf("unexpected error: %v", err)
  276. }
  277. if decl.Name != "maxRetries" {
  278. t.Errorf("Name = %q, want 'maxRetries'", decl.Name)
  279. }
  280. if decl.Type != "INT" {
  281. t.Errorf("Type = %q, want 'INT'", decl.Type)
  282. }
  283. if decl.Default == nil || *decl.Default != "3" {
  284. t.Errorf("Default = %v, want ptr to '3'", decl.Default)
  285. }
  286. }
  287. func TestParseParamDeclaration_BOOL_Default(t *testing.T) {
  288. decl, err := workflow.ParseParamDeclaration("enabled(BOOL) = true")
  289. if err != nil {
  290. t.Fatalf("unexpected error: %v", err)
  291. }
  292. if decl.Default == nil || *decl.Default != "true" {
  293. t.Errorf("Default = %v, want ptr to 'true'", decl.Default)
  294. }
  295. }
  296. func TestParseParamDeclaration_STRING_Default(t *testing.T) {
  297. // String literals in declarations use surrounding double quotes.
  298. decl, err := workflow.ParseParamDeclaration(`prefix(STRING) = "hello"`)
  299. if err != nil {
  300. t.Fatalf("unexpected error: %v", err)
  301. }
  302. // ParseParamDeclaration strips the surrounding double quotes.
  303. if decl.Default == nil || *decl.Default != "hello" {
  304. t.Errorf("Default = %v, want ptr to 'hello' (quotes stripped)", decl.Default)
  305. }
  306. }
  307. func TestParseParamDeclaration_NoDefault(t *testing.T) {
  308. decl, err := workflow.ParseParamDeclaration("userId(STRING)")
  309. if err != nil {
  310. t.Fatalf("unexpected error: %v", err)
  311. }
  312. if decl.Default != nil {
  313. t.Errorf("Default should be nil, got %q", *decl.Default)
  314. }
  315. }
  316. func TestCoerceParamDefault_INT(t *testing.T) {
  317. raw := "5"
  318. decl := &workflow.ParamDeclaration{Name: "n", Type: "INT", Default: &raw}
  319. v, err := workflow.CoerceParamDefault(decl)
  320. if err != nil {
  321. t.Fatalf("unexpected error: %v", err)
  322. }
  323. if v != int64(5) {
  324. t.Errorf("CoerceParamDefault = %v (%T), want int64(5)", v, v)
  325. }
  326. }
  327. func TestCoerceParamDefault_BOOL(t *testing.T) {
  328. raw := "false"
  329. decl := &workflow.ParamDeclaration{Name: "flag", Type: "BOOL", Default: &raw}
  330. v, err := workflow.CoerceParamDefault(decl)
  331. if err != nil {
  332. t.Fatalf("unexpected error: %v", err)
  333. }
  334. if v != false {
  335. t.Errorf("CoerceParamDefault = %v, want false", v)
  336. }
  337. }
  338. func TestCoerceParamDefault_STRING(t *testing.T) {
  339. raw := "hello"
  340. decl := &workflow.ParamDeclaration{Name: "s", Type: "STRING", Default: &raw}
  341. v, err := workflow.CoerceParamDefault(decl)
  342. if err != nil {
  343. t.Fatalf("unexpected error: %v", err)
  344. }
  345. if v != "hello" {
  346. t.Errorf("CoerceParamDefault = %v, want 'hello'", v)
  347. }
  348. }
  349. func TestCoerceParamDefault_Nil(t *testing.T) {
  350. decl := &workflow.ParamDeclaration{Name: "n", Type: "INT", Default: nil}
  351. v, err := workflow.CoerceParamDefault(decl)
  352. if err != nil {
  353. t.Fatalf("unexpected error: %v", err)
  354. }
  355. if v != nil {
  356. t.Errorf("CoerceParamDefault = %v, want nil", v)
  357. }
  358. }
  359. // TestParamDefaultApplied verifies that when a registry param has a default and
  360. // the caller does NOT provide it, the coerced default is applied to execCtx.Params.
  361. func TestParamDefaultApplied(t *testing.T) {
  362. reg := workflow.Registry{
  363. Services: []string{},
  364. Components: []string{},
  365. Vars: []string{"$out(ANY)"},
  366. Params: []string{"retries(INT) = 3"},
  367. }
  368. wf := &workflow.Workflow{
  369. Version: "3.15",
  370. Name: "DefaultParam",
  371. Registry: reg,
  372. Steps: []workflow.Step{
  373. // Write the retries param value to $out so we can inspect it.
  374. {ID: "Set_out", Target: "$out", Value: "=retries", Next: "Stop_end"},
  375. {ID: "Stop_end"},
  376. },
  377. }
  378. eng, err := workflow.NewEngine(wf)
  379. if err != nil {
  380. t.Fatalf("NewEngine: %v", err)
  381. }
  382. ctx := runAndDrain(t, eng, nil /* no initialVars — default should apply */)
  383. // Default is int64(3)
  384. if got := ctx.Variables["$out"]; got != int64(3) {
  385. t.Errorf("$out = %v (%T), want int64(3) (default value applied)", got, got)
  386. }
  387. }
  388. // TestParamDefaultNotOverridden verifies that an explicit caller value wins.
  389. func TestParamDefaultNotOverridden(t *testing.T) {
  390. reg := workflow.Registry{
  391. Services: []string{},
  392. Components: []string{},
  393. Vars: []string{"$out(ANY)"},
  394. Params: []string{"retries(INT) = 3"},
  395. }
  396. wf := &workflow.Workflow{
  397. Version: "3.15",
  398. Name: "DefaultParamOverride",
  399. Registry: reg,
  400. Steps: []workflow.Step{
  401. {ID: "Set_out", Target: "$out", Value: "=retries", Next: "Stop_end"},
  402. {ID: "Stop_end"},
  403. },
  404. }
  405. eng, err := workflow.NewEngine(wf)
  406. if err != nil {
  407. t.Fatalf("NewEngine: %v", err)
  408. }
  409. ctx := runAndDrain(t, eng, map[string]interface{}{"retries": int64(10)})
  410. if got := ctx.Variables["$out"]; got != int64(10) {
  411. t.Errorf("$out = %v, want int64(10) (caller value must override default)", got)
  412. }
  413. }
  414. // ── P0-4: RunParams + ExecuteWithRunParams ────────────────────────────────────
  415. func TestExecuteWithRunParams_Stored(t *testing.T) {
  416. reg := auditRegistry()
  417. wf := &workflow.Workflow{
  418. Version: "3.15",
  419. Name: "RunParamsTest",
  420. Registry: reg,
  421. Steps: []workflow.Step{
  422. {ID: "Noop_start", Next: "Stop_end"},
  423. {ID: "Stop_end"},
  424. },
  425. }
  426. eng, err := workflow.NewEngine(wf)
  427. if err != nil {
  428. t.Fatalf("NewEngine: %v", err)
  429. }
  430. runParams := workflow.RunParams{
  431. WorkspaceID: "ws-42",
  432. Mode: "test",
  433. Nodes: []string{"Noop_start"},
  434. }
  435. result, execErr := eng.ExecuteWithRunParams(context.Background(), nil, auditAdapters(), runParams)
  436. if execErr != nil {
  437. t.Fatalf("ExecuteWithRunParams: %v", execErr)
  438. }
  439. for range result.RunEventStream {
  440. }
  441. rp := result.Context.RunParams
  442. if rp == nil {
  443. t.Fatal("RunParams is nil on context")
  444. }
  445. if rp.WorkspaceID != "ws-42" {
  446. t.Errorf("WorkspaceID = %q, want 'ws-42'", rp.WorkspaceID)
  447. }
  448. if rp.Mode != "test" {
  449. t.Errorf("Mode = %q, want 'test'", rp.Mode)
  450. }
  451. if len(rp.Nodes) != 1 || rp.Nodes[0] != "Noop_start" {
  452. t.Errorf("Nodes = %v, want ['Noop_start']", rp.Nodes)
  453. }
  454. }
  455. // ── P1-1: Resume on non-paused → error (spec §11.5.4) ─────────────────────────
  456. // TestResumeNotPausedReturnsError verifies that calling Resume when the workflow
  457. // is not in paused state (or has already completed) returns an error.
  458. // The implementation also emits a best-effort pause_rejected event (spec §11.5.4);
  459. // observability of that event is timing-dependent and not asserted here.
  460. func TestResumeNotPausedReturnsError(t *testing.T) {
  461. reg := auditRegistry()
  462. wf := &workflow.Workflow{
  463. Version: "3.15",
  464. Name: "NotPausedReject",
  465. Registry: reg,
  466. Steps: []workflow.Step{
  467. {ID: "Noop_start", Next: "Stop_end"},
  468. {ID: "Stop_end"},
  469. },
  470. }
  471. eng, err := workflow.NewEngine(wf)
  472. if err != nil {
  473. t.Fatalf("NewEngine: %v", err)
  474. }
  475. result, execErr := eng.Execute(context.Background(), nil, auditAdapters())
  476. if execErr != nil {
  477. t.Fatalf("Execute: %v", execErr)
  478. }
  479. // Drain stream (workflow completes) before calling Resume.
  480. for range result.RunEventStream {
  481. }
  482. resumeErr := eng.Resume(result.Context, workflow.ResumeRequest{
  483. Token: "any-token",
  484. RequestID: "req-001",
  485. })
  486. if resumeErr == nil {
  487. t.Fatal("expected error from Resume on non-paused workflow, got nil")
  488. }
  489. if !strings.Contains(resumeErr.Error(), "paused") {
  490. t.Errorf("error = %q, want it to mention 'paused'", resumeErr.Error())
  491. }
  492. }
  493. // TestResumeNotPausedEmitsPauseRejected verifies the pause_rejected event IS
  494. // emitted when Resume is called on a paused workflow with an invalid token.
  495. // The workflow has a short timeout so it terminates automatically without needing
  496. // the correct token. This ensures deterministic timing.
  497. func TestResumeNotPausedEmitsPauseRejected(t *testing.T) {
  498. reg := v315Registry()
  499. wf := &workflow.Workflow{
  500. Version: "3.15",
  501. Name: "PauseRejectedEvent",
  502. Registry: reg,
  503. Steps: []workflow.Step{
  504. {
  505. ID: "Pause_wait",
  506. ResumeResultTarget: "$result",
  507. Next: "Stop_end",
  508. // Short timeout so the workflow self-terminates after the bad Resume call.
  509. Timeout: &workflow.PauseTimeout{Sec: 2, On: "Stop_timeout"},
  510. },
  511. {ID: "Stop_end"},
  512. {ID: "Stop_timeout"},
  513. },
  514. }
  515. eng := mustEngineV315(t, wf)
  516. result, execErr := eng.Execute(context.Background(), nil, v315Adapters())
  517. if execErr != nil {
  518. t.Fatalf("Execute: %v", execErr)
  519. }
  520. // Collect ALL events in the background goroutine.
  521. evtCh := make(chan []workflow.RunEvent, 1)
  522. go func() {
  523. var evs []workflow.RunEvent
  524. for ev := range result.RunEventStream {
  525. evs = append(evs, ev)
  526. }
  527. evtCh <- evs
  528. }()
  529. // Wait for workflow to pause (stream still open, PauseState set).
  530. if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) {
  531. t.Fatal("workflow did not reach paused state in time")
  532. }
  533. // Call Resume with an invalid token → engine emits pause_rejected(invalid_token)
  534. // to the open stream, which the background goroutine will capture.
  535. badErr := eng.Resume(result.Context, workflow.ResumeRequest{
  536. Token: "wrong-token-xyz",
  537. RequestID: "req-bad-001",
  538. })
  539. if badErr == nil {
  540. t.Fatal("expected error for invalid token, got nil")
  541. }
  542. // Wait for the workflow to complete (it will self-terminate via the 2s timeout).
  543. allEvents := <-evtCh
  544. // Check that pause_rejected(invalid_token) was emitted.
  545. var rejected *workflow.RunEvent
  546. for i := range allEvents {
  547. if allEvents[i].Type == workflow.RunEventPauseRejected {
  548. rejected = &allEvents[i]
  549. break
  550. }
  551. }
  552. if rejected == nil {
  553. t.Fatal("expected pause_rejected event in stream, none found")
  554. }
  555. if rc, _ := rejected.Payload["reasonCode"].(string); rc != "invalid_token" {
  556. t.Errorf("reasonCode = %q, want 'invalid_token'", rc)
  557. }
  558. }