v312_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. package workflow_test
  2. import (
  3. "context"
  4. "testing"
  5. "workflow"
  6. )
  7. // makeV312Workflow returns a minimal v3.10 workflow with a single LLM step for 3.12 feature tests.
  8. // When inStream is true, in.stream is set to enable streaming.
  9. func makeV312Workflow(inStream bool) *workflow.Workflow {
  10. in := workflow.StepInput{
  11. "messages": []interface{}{
  12. map[string]interface{}{"role": "user", "content": "=$question"},
  13. },
  14. }
  15. if inStream {
  16. in["stream"] = true
  17. }
  18. return &workflow.Workflow{
  19. Version: "3.10",
  20. Name: "V312 Test",
  21. Registry: workflow.Registry{
  22. Services: []string{},
  23. Components: []string{},
  24. Params: []string{"question(STRING)"},
  25. Vars: []string{"$answer(STRING)"},
  26. Files: workflow.FilesRegistry{},
  27. },
  28. Steps: []workflow.Step{
  29. {
  30. ID: "LLM_Answer",
  31. In: in,
  32. Out: workflow.StepOutput{"$answer": "=_result"},
  33. Next: "Stop_End",
  34. },
  35. {ID: "Stop_End"},
  36. },
  37. }
  38. }
  39. // ---------------------------------------------------------------------------
  40. // in.stream tests (spec 3.12 §1 — in.stream remains the streaming control)
  41. // ---------------------------------------------------------------------------
  42. // TestStreamNotSetByDefault verifies that when in.stream is absent,
  43. // no llm_token RunEvents are emitted but llm_done IS emitted.
  44. func TestStreamNotSetByDefault(t *testing.T) {
  45. wf := makeV312Workflow(false) // no in.stream
  46. adapters := createTestAdapters()
  47. llm := adapters.LLM.(*workflow.DefaultLLMAdapter)
  48. llm.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
  49. // params["stream"] should not be set
  50. if v, ok := params["stream"].(bool); ok && v {
  51. t.Error("params[\"stream\"] should not be true when in.stream is absent")
  52. }
  53. return map[string]interface{}{
  54. "content": "hello",
  55. "model": "gpt-4",
  56. "finish_reason": "stop",
  57. "usage": map[string]interface{}{
  58. "prompt_tokens": 5,
  59. "completion_tokens": 3,
  60. "total_tokens": 8,
  61. },
  62. }, nil
  63. })
  64. engine, err := workflow.NewEngine(wf)
  65. if err != nil {
  66. t.Fatalf("NewEngine: %v", err)
  67. }
  68. result, err := engine.Execute(context.Background(), map[string]interface{}{"question": "hi"}, adapters)
  69. if err != nil {
  70. t.Fatalf("Execute: %v", err)
  71. }
  72. var runEvents []workflow.RunEvent
  73. for ev := range result.RunEventStream {
  74. runEvents = append(runEvents, ev)
  75. }
  76. // No llm_token events expected
  77. for _, ev := range runEvents {
  78. if ev.Type == workflow.RunEventLLMToken {
  79. t.Error("unexpected llm_token RunEvent when in.stream is absent")
  80. }
  81. }
  82. // llm_done MUST be present
  83. hasDone := false
  84. for _, ev := range runEvents {
  85. if ev.Type == workflow.RunEventLLMDone {
  86. hasDone = true
  87. if ev.StepID == nil || *ev.StepID != "LLM_Answer" {
  88. t.Errorf("llm_done step_id: got %v, want 'LLM_Answer'", ev.StepID)
  89. }
  90. if ev.Payload["latency_ms"] == nil {
  91. t.Error("llm_done payload missing latency_ms")
  92. }
  93. if ev.Payload["finish_reason"] != "stop" {
  94. t.Errorf("llm_done finish_reason: got %v, want 'stop'", ev.Payload["finish_reason"])
  95. }
  96. if ev.Payload["model"] != "gpt-4" {
  97. t.Errorf("llm_done model: got %v, want 'gpt-4'", ev.Payload["model"])
  98. }
  99. usage, ok := ev.Payload["usage"].(map[string]interface{})
  100. if !ok {
  101. t.Error("llm_done payload missing usage")
  102. } else if usage["total_tokens"] != 8 {
  103. t.Errorf("llm_done usage.total_tokens: got %v, want 8", usage["total_tokens"])
  104. }
  105. }
  106. }
  107. if !hasDone {
  108. t.Error("expected llm_done RunEvent, none found")
  109. }
  110. }
  111. // TestStreamInStreamTrue verifies that in.stream:true forwards llm_token RunEvents.
  112. func TestStreamInStreamTrue(t *testing.T) {
  113. wf := makeV312Workflow(true) // in.stream: true
  114. adapters := createTestAdapters()
  115. llm := adapters.LLM.(*workflow.DefaultLLMAdapter)
  116. llm.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
  117. // adapter should receive stream:true
  118. if v, ok := params["stream"].(bool); !ok || !v {
  119. t.Error("params[\"stream\"] should be true when in.stream is true")
  120. }
  121. chunks := []string{"He", "ll", "o!"}
  122. for _, c := range chunks {
  123. stream <- c
  124. }
  125. return map[string]interface{}{
  126. "content": "Hello!",
  127. "model": "gpt-4",
  128. "finish_reason": "stop",
  129. "usage": map[string]interface{}{
  130. "prompt_tokens": 5,
  131. "completion_tokens": 2,
  132. "total_tokens": 7,
  133. },
  134. }, nil
  135. })
  136. engine, err := workflow.NewEngine(wf)
  137. if err != nil {
  138. t.Fatalf("NewEngine: %v", err)
  139. }
  140. result, err := engine.Execute(context.Background(), map[string]interface{}{"question": "hi"}, adapters)
  141. if err != nil {
  142. t.Fatalf("Execute: %v", err)
  143. }
  144. var runEvents []workflow.RunEvent
  145. for ev := range result.RunEventStream {
  146. runEvents = append(runEvents, ev)
  147. }
  148. // Count llm_token events and verify deltas
  149. var tokens []string
  150. for _, ev := range runEvents {
  151. if ev.Type == workflow.RunEventLLMToken {
  152. if ev.StepID == nil || *ev.StepID != "LLM_Answer" {
  153. t.Errorf("llm_token step_id: got %v, want 'LLM_Answer'", ev.StepID)
  154. }
  155. if delta, ok := ev.Payload["delta"].(string); ok {
  156. tokens = append(tokens, delta)
  157. }
  158. }
  159. }
  160. if len(tokens) != 3 {
  161. t.Errorf("expected 3 llm_token events, got %d", len(tokens))
  162. }
  163. // llm_done must come after all llm_token events
  164. lastTokenIdx, doneIdx := -1, -1
  165. for i, ev := range runEvents {
  166. if ev.Type == workflow.RunEventLLMToken {
  167. lastTokenIdx = i
  168. }
  169. if ev.Type == workflow.RunEventLLMDone {
  170. doneIdx = i
  171. }
  172. }
  173. if doneIdx == -1 {
  174. t.Fatal("expected llm_done RunEvent, none found")
  175. }
  176. if lastTokenIdx != -1 && doneIdx <= lastTokenIdx {
  177. t.Error("llm_done must come after all llm_token events")
  178. }
  179. }
  180. // ---------------------------------------------------------------------------
  181. // run_events structure tests (spec 3.12 Chapter 13)
  182. // ---------------------------------------------------------------------------
  183. // TestRunEventsWorkflowLifecycle verifies that workflow_start and workflow_done
  184. // are emitted with correct payloads and ordering.
  185. func TestRunEventsWorkflowLifecycle(t *testing.T) {
  186. wf := makeV312Workflow(false)
  187. adapters := createTestAdapters()
  188. llm := adapters.LLM.(*workflow.DefaultLLMAdapter)
  189. llm.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
  190. return map[string]interface{}{"content": "ok", "model": "gpt-4", "finish_reason": "stop"}, nil
  191. })
  192. engine, err := workflow.NewEngine(wf)
  193. if err != nil {
  194. t.Fatalf("NewEngine: %v", err)
  195. }
  196. result, err := engine.Execute(context.Background(), map[string]interface{}{"question": "test"}, adapters)
  197. if err != nil {
  198. t.Fatalf("Execute: %v", err)
  199. }
  200. var runEvents []workflow.RunEvent
  201. for ev := range result.RunEventStream {
  202. runEvents = append(runEvents, ev)
  203. }
  204. if len(runEvents) == 0 {
  205. t.Fatal("expected RunEvents, got none")
  206. }
  207. // First event must be workflow_start
  208. if runEvents[0].Type != workflow.RunEventWorkflowStart {
  209. t.Errorf("first RunEvent type: got %q, want %q", runEvents[0].Type, workflow.RunEventWorkflowStart)
  210. }
  211. if runEvents[0].StepID != nil {
  212. t.Error("workflow_start step_id must be null")
  213. }
  214. if _, ok := runEvents[0].Payload["params"]; !ok {
  215. t.Error("workflow_start payload missing 'params'")
  216. }
  217. // Last event must be workflow_done
  218. last := runEvents[len(runEvents)-1]
  219. if last.Type != workflow.RunEventWorkflowDone {
  220. t.Errorf("last RunEvent type: got %q, want %q", last.Type, workflow.RunEventWorkflowDone)
  221. }
  222. if last.StepID != nil {
  223. t.Error("workflow_done step_id must be null")
  224. }
  225. if last.Payload["stop_id"] != "Stop_End" {
  226. t.Errorf("workflow_done stop_id: got %v, want 'Stop_End'", last.Payload["stop_id"])
  227. }
  228. if last.Payload["duration_ms"] == nil {
  229. t.Error("workflow_done payload missing duration_ms")
  230. }
  231. // seq must be monotonically increasing from 1
  232. for i, ev := range runEvents {
  233. if ev.Seq != uint64(i+1) {
  234. t.Errorf("RunEvent[%d].seq: got %d, want %d", i, ev.Seq, i+1)
  235. }
  236. }
  237. // run_id must be consistent across all events
  238. runID := runEvents[0].RunID
  239. for i, ev := range runEvents {
  240. if ev.RunID != runID {
  241. t.Errorf("RunEvent[%d].run_id mismatch: got %q, want %q", i, ev.RunID, runID)
  242. }
  243. }
  244. // ts must be non-empty
  245. for _, ev := range runEvents {
  246. if ev.Ts == "" {
  247. t.Errorf("RunEvent %q has empty ts", ev.Type)
  248. }
  249. }
  250. }
  251. // TestRunEventsStepLifecycle verifies step_start → llm_done → step_done ordering.
  252. func TestRunEventsStepLifecycle(t *testing.T) {
  253. wf := makeV312Workflow(false)
  254. adapters := createTestAdapters()
  255. llm := adapters.LLM.(*workflow.DefaultLLMAdapter)
  256. llm.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
  257. return map[string]interface{}{
  258. "content": "answer",
  259. "model": "gpt-4",
  260. "finish_reason": "stop",
  261. "usage": map[string]interface{}{"prompt_tokens": 3, "completion_tokens": 2, "total_tokens": 5},
  262. }, nil
  263. })
  264. engine, err := workflow.NewEngine(wf)
  265. if err != nil {
  266. t.Fatalf("NewEngine: %v", err)
  267. }
  268. result, err := engine.Execute(context.Background(), map[string]interface{}{"question": "q"}, adapters)
  269. if err != nil {
  270. t.Fatalf("Execute: %v", err)
  271. }
  272. var runEvents []workflow.RunEvent
  273. for ev := range result.RunEventStream {
  274. runEvents = append(runEvents, ev)
  275. }
  276. // Find indices of relevant events for LLM_Answer
  277. idxStart, idxLLMDone, idxDone := -1, -1, -1
  278. for i, ev := range runEvents {
  279. if ev.StepID != nil && *ev.StepID == "LLM_Answer" {
  280. switch ev.Type {
  281. case workflow.RunEventStepStart:
  282. idxStart = i
  283. case workflow.RunEventLLMDone:
  284. idxLLMDone = i
  285. case workflow.RunEventStepDone:
  286. idxDone = i
  287. }
  288. }
  289. }
  290. if idxStart == -1 {
  291. t.Fatal("missing step_start for LLM_Answer")
  292. }
  293. if idxLLMDone == -1 {
  294. t.Fatal("missing llm_done for LLM_Answer")
  295. }
  296. if idxDone == -1 {
  297. t.Fatal("missing step_done for LLM_Answer")
  298. }
  299. // Order: step_start < llm_done < step_done
  300. if !(idxStart < idxLLMDone && idxLLMDone < idxDone) {
  301. t.Errorf("event order wrong: step_start=%d, llm_done=%d, step_done=%d (want start<llmdone<done)",
  302. idxStart, idxLLMDone, idxDone)
  303. }
  304. // step_start payload
  305. if runEvents[idxStart].Payload["step_type"] != "LLM_*" {
  306. t.Errorf("step_start step_type: got %v, want 'LLM_*'", runEvents[idxStart].Payload["step_type"])
  307. }
  308. // step_done payload must include duration_ms and step_type
  309. donePayload := runEvents[idxDone].Payload
  310. if donePayload["duration_ms"] == nil {
  311. t.Error("step_done payload missing duration_ms")
  312. }
  313. if donePayload["step_type"] != "LLM_*" {
  314. t.Errorf("step_done step_type: got %v, want 'LLM_*'", donePayload["step_type"])
  315. }
  316. }
  317. // TestRunEventsStepSkipped verifies that step_skipped is emitted when if=false.
  318. func TestRunEventsStepSkipped(t *testing.T) {
  319. wf := &workflow.Workflow{
  320. Version: "3.10",
  321. Name: "Skipped Test",
  322. Registry: workflow.Registry{
  323. Services: []string{},
  324. Components: []string{},
  325. Vars: []string{"$flag(BOOLEAN)", "$result(STRING)"},
  326. Files: workflow.FilesRegistry{},
  327. },
  328. Steps: []workflow.Step{
  329. {
  330. ID: "LLM_Skipped",
  331. // $flag is initialized to boolean false via initialVars
  332. If: "=$flag",
  333. In: workflow.StepInput{
  334. "messages": []interface{}{map[string]interface{}{"role": "user", "content": "hi"}},
  335. },
  336. Out: workflow.StepOutput{"$result": "=_result"},
  337. Next: "Stop_End",
  338. },
  339. {ID: "Stop_End"},
  340. },
  341. }
  342. adapters := createTestAdapters()
  343. engine, err := workflow.NewEngine(wf)
  344. if err != nil {
  345. t.Fatalf("NewEngine: %v", err)
  346. }
  347. // Pass $flag as boolean false so the if condition fails
  348. result, err := engine.Execute(context.Background(), map[string]interface{}{"$flag": false}, adapters)
  349. if err != nil {
  350. t.Fatalf("Execute: %v", err)
  351. }
  352. var runEvents []workflow.RunEvent
  353. for ev := range result.RunEventStream {
  354. runEvents = append(runEvents, ev)
  355. }
  356. // Find step_skipped for LLM_Skipped
  357. found := false
  358. for _, ev := range runEvents {
  359. if ev.Type == workflow.RunEventStepSkipped && ev.StepID != nil && *ev.StepID == "LLM_Skipped" {
  360. found = true
  361. if ev.Payload["reason"] != "if_false" {
  362. t.Errorf("step_skipped reason: got %v, want 'if_false'", ev.Payload["reason"])
  363. }
  364. if ev.Payload["step_type"] != "LLM_*" {
  365. t.Errorf("step_skipped step_type: got %v, want 'LLM_*'", ev.Payload["step_type"])
  366. }
  367. }
  368. }
  369. if !found {
  370. t.Error("expected step_skipped RunEvent for LLM_Skipped, not found")
  371. }
  372. // No step_start or step_done should exist for LLM_Skipped
  373. for _, ev := range runEvents {
  374. if ev.StepID != nil && *ev.StepID == "LLM_Skipped" {
  375. if ev.Type == workflow.RunEventStepStart || ev.Type == workflow.RunEventStepDone {
  376. t.Errorf("unexpected %q event for skipped step LLM_Skipped", ev.Type)
  377. }
  378. }
  379. }
  380. }
  381. // TestRunEventsWorkflowFailed verifies that workflow_failed is emitted with error info.
  382. func TestRunEventsWorkflowFailed(t *testing.T) {
  383. wf := &workflow.Workflow{
  384. Version: "3.10",
  385. Name: "Failed Test",
  386. Registry: workflow.Registry{
  387. Services: []string{},
  388. Components: []string{},
  389. Vars: []string{"$answer(STRING)"},
  390. Files: workflow.FilesRegistry{},
  391. },
  392. Steps: []workflow.Step{
  393. {
  394. ID: "LLM_Fail",
  395. In: workflow.StepInput{
  396. "messages": []interface{}{map[string]interface{}{"role": "user", "content": "hi"}},
  397. },
  398. Out: workflow.StepOutput{"$answer": "=_result"},
  399. Next: "Stop_End",
  400. },
  401. {ID: "Stop_End"},
  402. },
  403. }
  404. adapters := createTestAdapters()
  405. llm := adapters.LLM.(*workflow.DefaultLLMAdapter)
  406. llm.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
  407. return nil, &workflow.LLMError{
  408. Type: "rate_limit",
  409. Code: "RATE_LIMIT",
  410. Message: "Rate limit exceeded",
  411. Retryable: true,
  412. StatusCode: 429,
  413. }
  414. })
  415. engine, err := workflow.NewEngine(wf)
  416. if err != nil {
  417. t.Fatalf("NewEngine: %v", err)
  418. }
  419. result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters)
  420. if err != nil {
  421. t.Fatalf("Execute: %v", err)
  422. }
  423. var runEvents []workflow.RunEvent
  424. for ev := range result.RunEventStream {
  425. runEvents = append(runEvents, ev)
  426. }
  427. found := false
  428. for _, ev := range runEvents {
  429. if ev.Type == workflow.RunEventWorkflowFailed {
  430. found = true
  431. if ev.StepID != nil {
  432. t.Error("workflow_failed step_id must be null")
  433. }
  434. if ev.Payload["failed_step_id"] != "LLM_Fail" {
  435. t.Errorf("workflow_failed failed_step_id: got %v, want 'LLM_Fail'", ev.Payload["failed_step_id"])
  436. }
  437. errMap, ok := ev.Payload["error"].(map[string]interface{})
  438. if !ok {
  439. t.Error("workflow_failed payload missing error object")
  440. } else if errMap["type"] != "rate_limit" {
  441. t.Errorf("workflow_failed error.type: got %v, want 'rate_limit'", errMap["type"])
  442. }
  443. if ev.Payload["duration_ms"] == nil {
  444. t.Error("workflow_failed payload missing duration_ms")
  445. }
  446. }
  447. }
  448. if !found {
  449. t.Error("expected workflow_failed RunEvent, not found")
  450. }
  451. }