v315_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780
  1. package workflow_test
  2. import (
  3. "context"
  4. "sync"
  5. "testing"
  6. "time"
  7. "workflow"
  8. )
  9. // ── helpers ───────────────────────────────────────────────────────────────────
  10. // v315Registry returns a minimal registry suitable for v3.15 pause tests.
  11. func v315Registry() workflow.Registry {
  12. return workflow.Registry{
  13. Services: []string{},
  14. Components: []string{},
  15. Vars: []string{"$result(ANY)"},
  16. }
  17. }
  18. // v315Adapters returns a minimal Adapters struct with no real adapters.
  19. func v315Adapters() *workflow.Adapters {
  20. return &workflow.Adapters{
  21. Service: workflow.NewDefaultServiceAdapter(),
  22. Component: workflow.NewDefaultComponentAdapter(),
  23. LLM: workflow.NewDefaultLLMAdapter(),
  24. }
  25. }
  26. func mustEngineV315(t *testing.T, wf *workflow.Workflow) *workflow.Engine {
  27. t.Helper()
  28. eng, err := workflow.NewEngine(wf)
  29. if err != nil {
  30. t.Fatalf("NewEngine: %v", err)
  31. }
  32. return eng
  33. }
  34. // collectEvents drains the event stream into a slice (for assertion).
  35. func collectEvents(stream <-chan workflow.RunEvent) []workflow.RunEvent {
  36. var events []workflow.RunEvent
  37. for ev := range stream {
  38. events = append(events, ev)
  39. }
  40. return events
  41. }
  42. // findEvent returns the first event of the given type, or nil.
  43. func findEvent(events []workflow.RunEvent, t workflow.RunEventType) *workflow.RunEvent {
  44. for i := range events {
  45. if events[i].Type == t {
  46. return &events[i]
  47. }
  48. }
  49. return nil
  50. }
  51. // waitForStatus blocks until execCtx.Status equals want or timeout fires.
  52. func waitForStatus(execCtx *workflow.ExecutionContext, want workflow.ExecutionStatus, timeout time.Duration) bool {
  53. deadline := time.Now().Add(timeout)
  54. for time.Now().Before(deadline) {
  55. if execCtx.Status == want {
  56. return true
  57. }
  58. time.Sleep(5 * time.Millisecond)
  59. }
  60. return false
  61. }
  62. // pauseWorkflow returns a minimal workflow that pauses and then stops.
  63. func pauseWorkflow(version, resumeResultTarget string) *workflow.Workflow {
  64. return &workflow.Workflow{
  65. Version: version,
  66. Name: "Pause Test",
  67. Registry: v315Registry(),
  68. Steps: []workflow.Step{
  69. {
  70. ID: "Pause_Wait",
  71. ResumeResultTarget: resumeResultTarget,
  72. Next: "Stop_Done",
  73. },
  74. {ID: "Stop_Done"},
  75. },
  76. }
  77. }
  78. // extractToken reads the waitToken from the pause_start event payload.
  79. func extractToken(t *testing.T, events []workflow.RunEvent) string {
  80. t.Helper()
  81. ev := findEvent(events, workflow.RunEventPauseStart)
  82. if ev == nil {
  83. t.Fatal("no pause_start event found")
  84. }
  85. token, ok := ev.Payload["waitToken"].(string)
  86. if !ok || token == "" {
  87. t.Fatalf("pause_start event missing waitToken: %v", ev.Payload)
  88. }
  89. return token
  90. }
  91. // ── v3.15 version acceptance ──────────────────────────────────────────────────
  92. func TestV315_VersionAccepted(t *testing.T) {
  93. wf := pauseWorkflow("3.15", "$result")
  94. _, err := workflow.NewEngine(wf)
  95. if err != nil {
  96. t.Fatalf("expected version 3.15 to be accepted, got: %v", err)
  97. }
  98. }
  99. // ── Basic pause + resume ───────────────────────────────────────────────────────
  100. func TestV315_BasicPauseResume(t *testing.T) {
  101. wf := pauseWorkflow("3.15", "$result")
  102. eng := mustEngineV315(t, wf)
  103. result, err := eng.Execute(context.Background(), nil, v315Adapters())
  104. if err != nil {
  105. t.Fatalf("Execute: %v", err)
  106. }
  107. // Wait for workflow to reach paused state.
  108. if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) {
  109. t.Fatalf("workflow did not reach paused state; status=%s", result.Context.Status)
  110. }
  111. // Collect events so far by reading without blocking the goroutine.
  112. // We'll drain fully after resume.
  113. var mu sync.Mutex
  114. var allEvents []workflow.RunEvent
  115. done := make(chan struct{})
  116. go func() {
  117. for ev := range result.RunEventStream {
  118. mu.Lock()
  119. allEvents = append(allEvents, ev)
  120. mu.Unlock()
  121. }
  122. close(done)
  123. }()
  124. // Extract token from events published so far. Give the pause_start event
  125. // a little time to be emitted before we sample allEvents.
  126. time.Sleep(20 * time.Millisecond)
  127. mu.Lock()
  128. snapshot := make([]workflow.RunEvent, len(allEvents))
  129. copy(snapshot, allEvents)
  130. mu.Unlock()
  131. token := extractToken(t, snapshot)
  132. // Resume the workflow with a payload.
  133. payload := map[string]interface{}{"approved": true, "comment": "LGTM"}
  134. resumeErr := eng.Resume(result.Context, workflow.ResumeRequest{
  135. Token: token,
  136. Payload: payload,
  137. RequestID: "req-001",
  138. })
  139. if resumeErr != nil {
  140. t.Fatalf("Resume: %v", resumeErr)
  141. }
  142. // Wait for workflow to complete.
  143. <-done
  144. // Assert final status.
  145. if result.Context.Status != workflow.StatusStopped {
  146. t.Errorf("expected StatusStopped, got %s", result.Context.Status)
  147. }
  148. // Assert $result variable was written.
  149. gotResult, ok := result.Context.Variables["$result"]
  150. if !ok {
  151. t.Fatal("$result variable not set after resume")
  152. }
  153. gotMap, ok := gotResult.(map[string]interface{})
  154. if !ok {
  155. t.Fatalf("$result is %T, expected map", gotResult)
  156. }
  157. if gotMap["approved"] != true {
  158. t.Errorf("$result.approved = %v, want true", gotMap["approved"])
  159. }
  160. // Assert event sequence: pause_start → pause_resumed → step_done.
  161. mu.Lock()
  162. events := allEvents
  163. mu.Unlock()
  164. if findEvent(events, workflow.RunEventPauseStart) == nil {
  165. t.Error("missing pause_start event")
  166. }
  167. if findEvent(events, workflow.RunEventPauseResumed) == nil {
  168. t.Error("missing pause_resumed event")
  169. }
  170. if findEvent(events, workflow.RunEventWorkflowDone) == nil {
  171. t.Error("missing workflow_done event")
  172. }
  173. }
  174. // ── Validation: children not allowed on Pause_* ───────────────────────────────
  175. func TestV315_ValidationChildrenRejected(t *testing.T) {
  176. wf := &workflow.Workflow{
  177. Version: "3.15",
  178. Name: "Bad Children",
  179. Registry: v315Registry(),
  180. Steps: []workflow.Step{
  181. {
  182. ID: "Pause_WithChildren",
  183. ResumeResultTarget: "$result",
  184. Children: []string{"Stop_Done"},
  185. Next: "Stop_Done",
  186. },
  187. {ID: "Stop_Done"},
  188. },
  189. }
  190. _, err := workflow.NewEngine(wf)
  191. if err == nil {
  192. t.Fatal("expected validation error for children on Pause_* step, got nil")
  193. }
  194. }
  195. // ── RunID validation in Resume ────────────────────────────────────────────────
  196. func TestV315_ResumeRunIDMismatch(t *testing.T) {
  197. wf := pauseWorkflow("3.15", "$result")
  198. eng := mustEngineV315(t, wf)
  199. result, err := eng.Execute(context.Background(), nil, v315Adapters())
  200. if err != nil {
  201. t.Fatalf("Execute: %v", err)
  202. }
  203. if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) {
  204. t.Fatal("workflow did not reach paused state")
  205. }
  206. var mu sync.Mutex
  207. var allEvents []workflow.RunEvent
  208. done := make(chan struct{})
  209. go func() {
  210. for ev := range result.RunEventStream {
  211. mu.Lock()
  212. allEvents = append(allEvents, ev)
  213. mu.Unlock()
  214. }
  215. close(done)
  216. }()
  217. time.Sleep(20 * time.Millisecond)
  218. mu.Lock()
  219. snapshot := make([]workflow.RunEvent, len(allEvents))
  220. copy(snapshot, allEvents)
  221. mu.Unlock()
  222. token := extractToken(t, snapshot)
  223. // Wrong RunID must be rejected.
  224. err2 := eng.Resume(result.Context, workflow.ResumeRequest{
  225. RunID: "wrong-run-id",
  226. Token: token,
  227. })
  228. if err2 == nil {
  229. t.Fatal("expected error for mismatched RunID, got nil")
  230. }
  231. // Correct RunID must succeed.
  232. err3 := eng.Resume(result.Context, workflow.ResumeRequest{
  233. RunID: result.Context.WorkflowID,
  234. Token: token,
  235. })
  236. if err3 != nil {
  237. t.Fatalf("Resume with correct RunID failed: %v", err3)
  238. }
  239. <-done
  240. }
  241. // ── Pause with reason field ────────────────────────────────────────────────────
  242. func TestV315_PauseReasonField(t *testing.T) {
  243. wf := &workflow.Workflow{
  244. Version: "3.15",
  245. Name: "Pause Reason Test",
  246. Registry: v315Registry(),
  247. Steps: []workflow.Step{
  248. {
  249. ID: "Pause_WithReason",
  250. Reason: "请补充收货地址",
  251. ResumeResultTarget: "$result",
  252. Next: "Stop_Done",
  253. },
  254. {ID: "Stop_Done"},
  255. },
  256. }
  257. eng := mustEngineV315(t, wf)
  258. result, err := eng.Execute(context.Background(), nil, v315Adapters())
  259. if err != nil {
  260. t.Fatalf("Execute: %v", err)
  261. }
  262. if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) {
  263. t.Fatal("workflow did not reach paused state")
  264. }
  265. // Collect events asynchronously.
  266. var mu sync.Mutex
  267. var allEvents []workflow.RunEvent
  268. done := make(chan struct{})
  269. go func() {
  270. for ev := range result.RunEventStream {
  271. mu.Lock()
  272. allEvents = append(allEvents, ev)
  273. mu.Unlock()
  274. }
  275. close(done)
  276. }()
  277. time.Sleep(20 * time.Millisecond)
  278. mu.Lock()
  279. snapshot := make([]workflow.RunEvent, len(allEvents))
  280. copy(snapshot, allEvents)
  281. mu.Unlock()
  282. token := extractToken(t, snapshot)
  283. // Verify pause_start payload contains reason.
  284. pauseEv := findEvent(snapshot, workflow.RunEventPauseStart)
  285. if pauseEv == nil {
  286. t.Fatal("no pause_start event")
  287. }
  288. if reason, ok := pauseEv.Payload["reason"].(string); !ok || reason != "请补充收货地址" {
  289. t.Errorf("pause_start.reason = %v, want '请补充收货地址'", pauseEv.Payload["reason"])
  290. }
  291. // Resume and wait.
  292. eng.Resume(result.Context, workflow.ResumeRequest{Token: token, Payload: "addr123"})
  293. <-done
  294. }
  295. // ── Pause timeout ──────────────────────────────────────────────────────────────
  296. func TestV315_PauseTimeout(t *testing.T) {
  297. wf := &workflow.Workflow{
  298. Version: "3.15",
  299. Name: "Pause Timeout Test",
  300. Registry: v315Registry(),
  301. Steps: []workflow.Step{
  302. {
  303. ID: "Pause_Short",
  304. ResumeResultTarget: "$result",
  305. Timeout: &workflow.PauseTimeout{
  306. Sec: 1, // 1 second timeout
  307. On: "Stop_TimedOut",
  308. },
  309. Next: "Stop_Done",
  310. },
  311. {ID: "Stop_TimedOut"},
  312. {ID: "Stop_Done"},
  313. },
  314. }
  315. eng := mustEngineV315(t, wf)
  316. result, err := eng.Execute(context.Background(), nil, v315Adapters())
  317. if err != nil {
  318. t.Fatalf("Execute: %v", err)
  319. }
  320. // Drain events; timeout should fire within ~1s.
  321. events := collectEvents(result.RunEventStream)
  322. if result.Context.Status != workflow.StatusStopped {
  323. t.Errorf("expected StatusStopped after timeout, got %s", result.Context.Status)
  324. }
  325. // pause_start and pause_timeout must be present; pause_resumed must be absent.
  326. if findEvent(events, workflow.RunEventPauseStart) == nil {
  327. t.Error("missing pause_start event")
  328. }
  329. timeoutEv := findEvent(events, workflow.RunEventPauseTimeout)
  330. if timeoutEv == nil {
  331. t.Error("missing pause_timeout event")
  332. } else {
  333. if timeoutEv.Payload["timeoutAction"] != "Stop_TimedOut" {
  334. t.Errorf("timeoutAction = %v, want Stop_TimedOut", timeoutEv.Payload["timeoutAction"])
  335. }
  336. }
  337. if findEvent(events, workflow.RunEventPauseResumed) != nil {
  338. t.Error("unexpected pause_resumed event on timeout path")
  339. }
  340. }
  341. // ── Idempotent resume ─────────────────────────────────────────────────────────
  342. func TestV315_PauseIdempotentResume(t *testing.T) {
  343. wf := pauseWorkflow("3.15", "$result")
  344. eng := mustEngineV315(t, wf)
  345. result, err := eng.Execute(context.Background(), nil, v315Adapters())
  346. if err != nil {
  347. t.Fatalf("Execute: %v", err)
  348. }
  349. if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) {
  350. t.Fatal("workflow did not reach paused state")
  351. }
  352. var mu sync.Mutex
  353. var allEvents []workflow.RunEvent
  354. done := make(chan struct{})
  355. go func() {
  356. for ev := range result.RunEventStream {
  357. mu.Lock()
  358. allEvents = append(allEvents, ev)
  359. mu.Unlock()
  360. }
  361. close(done)
  362. }()
  363. time.Sleep(20 * time.Millisecond)
  364. mu.Lock()
  365. snapshot := make([]workflow.RunEvent, len(allEvents))
  366. copy(snapshot, allEvents)
  367. mu.Unlock()
  368. token := extractToken(t, snapshot)
  369. // Call Resume twice with the same RequestID.
  370. req := workflow.ResumeRequest{Token: token, Payload: "data", RequestID: "idem-001"}
  371. if err := eng.Resume(result.Context, req); err != nil {
  372. t.Fatalf("first Resume: %v", err)
  373. }
  374. // Second call with same RequestID must be a silent no-op (nil error).
  375. if err := eng.Resume(result.Context, req); err != nil {
  376. t.Fatalf("idempotent Resume returned error: %v", err)
  377. }
  378. <-done
  379. // Only one pause_resumed event should appear.
  380. mu.Lock()
  381. events := allEvents
  382. mu.Unlock()
  383. count := 0
  384. for _, ev := range events {
  385. if ev.Type == workflow.RunEventPauseResumed {
  386. count++
  387. }
  388. }
  389. if count != 1 {
  390. t.Errorf("expected exactly 1 pause_resumed event, got %d", count)
  391. }
  392. }
  393. // ── Invalid token ─────────────────────────────────────────────────────────────
  394. func TestV315_PauseInvalidToken(t *testing.T) {
  395. wf := pauseWorkflow("3.15", "$result")
  396. eng := mustEngineV315(t, wf)
  397. result, err := eng.Execute(context.Background(), nil, v315Adapters())
  398. if err != nil {
  399. t.Fatalf("Execute: %v", err)
  400. }
  401. if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) {
  402. t.Fatal("workflow did not reach paused state")
  403. }
  404. // Resume with wrong token — must return an error.
  405. resumeErr := eng.Resume(result.Context, workflow.ResumeRequest{
  406. Token: "wrong-token",
  407. Payload: nil,
  408. RequestID: "req-bad",
  409. })
  410. if resumeErr == nil {
  411. t.Fatal("expected error for invalid token, got nil")
  412. }
  413. // Workflow should still be paused.
  414. if result.Context.Status != workflow.StatusPaused {
  415. t.Errorf("expected StatusPaused after rejected resume, got %s", result.Context.Status)
  416. }
  417. // Drain the event stream by cancelling via context; collect remaining events.
  418. // (We need to unblock the goroutine to avoid a goroutine leak in the test.)
  419. // Re-execute with a context cancel to clean up.
  420. ctx2, cancel := context.WithCancel(context.Background())
  421. wf2 := pauseWorkflow("3.15", "$result")
  422. eng2 := mustEngineV315(t, wf2)
  423. result2, _ := eng2.Execute(ctx2, nil, v315Adapters())
  424. if !waitForStatus(result2.Context, workflow.StatusPaused, 2*time.Second) {
  425. t.Fatal("cleanup: workflow did not pause")
  426. }
  427. // Cancel to unblock.
  428. cancel()
  429. collectEvents(result2.RunEventStream)
  430. // For the original paused workflow, collect via a short-timeout cancel.
  431. ctxOrig, cancelOrig := context.WithCancel(context.Background())
  432. _ = ctxOrig
  433. cancelOrig()
  434. // We can't easily cancel the original; just verify pause_rejected event was emitted.
  435. // The channel still has events buffered; drain a few.
  436. drainLoop := true
  437. evBuf := make([]workflow.RunEvent, 0, 10)
  438. for drainLoop {
  439. select {
  440. case ev, ok := <-result.RunEventStream:
  441. if !ok {
  442. drainLoop = false
  443. } else {
  444. evBuf = append(evBuf, ev)
  445. }
  446. default:
  447. drainLoop = false
  448. }
  449. }
  450. if findEvent(evBuf, workflow.RunEventPauseRejected) == nil {
  451. // pause_rejected might have been emitted before we started draining;
  452. // check it was at least not missing from the total stream so far.
  453. // This is a best-effort check in this test.
  454. t.Log("note: pause_rejected event not found in recently drained events (may have been buffered earlier)")
  455. }
  456. }
  457. // ── Resume when not paused ────────────────────────────────────────────────────
  458. func TestV315_ResumeWhenNotPaused(t *testing.T) {
  459. wf := &workflow.Workflow{
  460. Version: "3.15",
  461. Name: "No Pause",
  462. Registry: v315Registry(),
  463. Steps: []workflow.Step{
  464. {ID: "Noop_Start", Next: "Stop_Done"},
  465. {ID: "Stop_Done"},
  466. },
  467. }
  468. eng := mustEngineV315(t, wf)
  469. result, err := eng.Execute(context.Background(), nil, v315Adapters())
  470. if err != nil {
  471. t.Fatalf("Execute: %v", err)
  472. }
  473. collectEvents(result.RunEventStream)
  474. // Workflow has completed; Resume must return an error.
  475. resumeErr := eng.Resume(result.Context, workflow.ResumeRequest{Token: "any", Payload: nil})
  476. if resumeErr == nil {
  477. t.Fatal("expected error when resuming a non-paused workflow, got nil")
  478. }
  479. }
  480. // ── Validation: missing resumeResultTarget ────────────────────────────────────
  481. func TestV315_ValidationMissingResumeResultTarget(t *testing.T) {
  482. wf := &workflow.Workflow{
  483. Version: "3.15",
  484. Name: "Bad Pause",
  485. Registry: v315Registry(),
  486. Steps: []workflow.Step{
  487. {
  488. ID: "Pause_Bad",
  489. // ResumeResultTarget intentionally omitted
  490. Next: "Stop_Done",
  491. },
  492. {ID: "Stop_Done"},
  493. },
  494. }
  495. _, err := workflow.NewEngine(wf)
  496. if err == nil {
  497. t.Fatal("expected validation error for missing resumeResultTarget, got nil")
  498. }
  499. }
  500. // ── Validation: timeout.sec must be > 0 ──────────────────────────────────────
  501. func TestV315_ValidationTimeoutSecZero(t *testing.T) {
  502. wf := &workflow.Workflow{
  503. Version: "3.15",
  504. Name: "Bad Timeout",
  505. Registry: v315Registry(),
  506. Steps: []workflow.Step{
  507. {
  508. ID: "Pause_BadTimeout",
  509. ResumeResultTarget: "$result",
  510. Timeout: &workflow.PauseTimeout{
  511. Sec: 0, // invalid
  512. On: "Stop_Done",
  513. },
  514. Next: "Stop_Done",
  515. },
  516. {ID: "Stop_Done"},
  517. },
  518. }
  519. _, err := workflow.NewEngine(wf)
  520. if err == nil {
  521. t.Fatal("expected validation error for timeout.sec == 0, got nil")
  522. }
  523. }
  524. // ── Validation: timeout.on must be non-empty ──────────────────────────────────
  525. func TestV315_ValidationTimeoutOnEmpty(t *testing.T) {
  526. wf := &workflow.Workflow{
  527. Version: "3.15",
  528. Name: "Bad TimeoutOn",
  529. Registry: v315Registry(),
  530. Steps: []workflow.Step{
  531. {
  532. ID: "Pause_NoOn",
  533. ResumeResultTarget: "$result",
  534. Timeout: &workflow.PauseTimeout{
  535. Sec: 60,
  536. On: "", // invalid
  537. },
  538. Next: "Stop_Done",
  539. },
  540. {ID: "Stop_Done"},
  541. },
  542. }
  543. _, err := workflow.NewEngine(wf)
  544. if err == nil {
  545. t.Fatal("expected validation error for empty timeout.on, got nil")
  546. }
  547. }
  548. // ── Resume payload written to nested $vars path ───────────────────────────────
  549. func TestV315_PauseNestedResumeResultTarget(t *testing.T) {
  550. wf := &workflow.Workflow{
  551. Version: "3.15",
  552. Name: "Nested Target Test",
  553. Registry: workflow.Registry{
  554. Services: []string{},
  555. Components: []string{},
  556. Vars: []string{"$approval(ANY)"},
  557. },
  558. Steps: []workflow.Step{
  559. {
  560. ID: "Pause_Approval",
  561. ResumeResultTarget: "$approval",
  562. Next: "Stop_Done",
  563. },
  564. {ID: "Stop_Done"},
  565. },
  566. }
  567. eng := mustEngineV315(t, wf)
  568. result, err := eng.Execute(context.Background(), nil, v315Adapters())
  569. if err != nil {
  570. t.Fatalf("Execute: %v", err)
  571. }
  572. if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) {
  573. t.Fatal("workflow did not reach paused state")
  574. }
  575. var mu sync.Mutex
  576. var allEvents []workflow.RunEvent
  577. done := make(chan struct{})
  578. go func() {
  579. for ev := range result.RunEventStream {
  580. mu.Lock()
  581. allEvents = append(allEvents, ev)
  582. mu.Unlock()
  583. }
  584. close(done)
  585. }()
  586. time.Sleep(20 * time.Millisecond)
  587. mu.Lock()
  588. snapshot := make([]workflow.RunEvent, len(allEvents))
  589. copy(snapshot, allEvents)
  590. mu.Unlock()
  591. token := extractToken(t, snapshot)
  592. approvalPayload := map[string]interface{}{"approved": true, "stage": "L1"}
  593. if err := eng.Resume(result.Context, workflow.ResumeRequest{
  594. Token: token,
  595. Payload: approvalPayload,
  596. RequestID: "req-l1",
  597. }); err != nil {
  598. t.Fatalf("Resume: %v", err)
  599. }
  600. <-done
  601. got, ok := result.Context.Variables["$approval"]
  602. if !ok {
  603. t.Fatal("$approval not set after resume")
  604. }
  605. gotMap, ok := got.(map[string]interface{})
  606. if !ok {
  607. t.Fatalf("$approval is %T, expected map", got)
  608. }
  609. if gotMap["stage"] != "L1" {
  610. t.Errorf("$approval.stage = %v, want L1", gotMap["stage"])
  611. }
  612. }
  613. // ── Context cancellation unblocks pause ───────────────────────────────────────
  614. func TestV315_PauseCancelledByContext(t *testing.T) {
  615. wf := pauseWorkflow("3.15", "$result")
  616. eng := mustEngineV315(t, wf)
  617. ctx, cancel := context.WithCancel(context.Background())
  618. result, err := eng.Execute(ctx, nil, v315Adapters())
  619. if err != nil {
  620. t.Fatalf("Execute: %v", err)
  621. }
  622. if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) {
  623. t.Fatal("workflow did not reach paused state")
  624. }
  625. // Cancel the context; the workflow goroutine should unblock and fail.
  626. cancel()
  627. events := collectEvents(result.RunEventStream)
  628. if result.Context.Status != workflow.StatusFailed {
  629. t.Errorf("expected StatusFailed after ctx cancel, got %s", result.Context.Status)
  630. }
  631. if findEvent(events, workflow.RunEventWorkflowFailed) == nil {
  632. t.Error("missing workflow_failed event after ctx cancel")
  633. }
  634. }
  635. // ── Second resume after first succeeds is rejected ────────────────────────────
  636. func TestV315_PauseDoubleResumeDifferentRequestID(t *testing.T) {
  637. wf := pauseWorkflow("3.15", "$result")
  638. eng := mustEngineV315(t, wf)
  639. result, err := eng.Execute(context.Background(), nil, v315Adapters())
  640. if err != nil {
  641. t.Fatalf("Execute: %v", err)
  642. }
  643. if !waitForStatus(result.Context, workflow.StatusPaused, 2*time.Second) {
  644. t.Fatal("workflow did not reach paused state")
  645. }
  646. var mu sync.Mutex
  647. var allEvents []workflow.RunEvent
  648. done := make(chan struct{})
  649. go func() {
  650. for ev := range result.RunEventStream {
  651. mu.Lock()
  652. allEvents = append(allEvents, ev)
  653. mu.Unlock()
  654. }
  655. close(done)
  656. }()
  657. time.Sleep(20 * time.Millisecond)
  658. mu.Lock()
  659. snapshot := make([]workflow.RunEvent, len(allEvents))
  660. copy(snapshot, allEvents)
  661. mu.Unlock()
  662. token := extractToken(t, snapshot)
  663. // First resume — must succeed.
  664. if err := eng.Resume(result.Context, workflow.ResumeRequest{
  665. Token: token, Payload: "first", RequestID: "req-A",
  666. }); err != nil {
  667. t.Fatalf("first Resume: %v", err)
  668. }
  669. <-done
  670. // Second resume (different RequestID) after workflow completed — must fail.
  671. err2 := eng.Resume(result.Context, workflow.ResumeRequest{
  672. Token: token, Payload: "second", RequestID: "req-B",
  673. })
  674. if err2 == nil {
  675. t.Fatal("expected error on second resume with different RequestID, got nil")
  676. }
  677. }