example_test.go 79 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994
  1. package workflow_test
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "sync"
  8. "testing"
  9. "workflow"
  10. )
  11. // Test helper functions
  12. // createTestAdapters creates a standard set of adapters for testing
  13. func createTestAdapters() *workflow.Adapters {
  14. return &workflow.Adapters{
  15. Service: workflow.NewDefaultServiceAdapter(),
  16. Component: workflow.NewDefaultComponentAdapter(),
  17. LLM: workflow.NewDefaultLLMAdapter(),
  18. File: workflow.NewDefaultFileAdapter(),
  19. }
  20. }
  21. // createTestAdaptersWithFile creates adapters with a specific file adapter
  22. func createTestAdaptersWithFile(fileAdapter workflow.FileAdapter) *workflow.Adapters {
  23. return &workflow.Adapters{
  24. Service: workflow.NewDefaultServiceAdapter(),
  25. Component: workflow.NewDefaultComponentAdapter(),
  26. LLM: workflow.NewDefaultLLMAdapter(),
  27. File: fileAdapter,
  28. }
  29. }
  30. // drainEvents consumes all run events from the stream
  31. func drainEvents(stream <-chan workflow.RunEvent) {
  32. for range stream {
  33. }
  34. }
  35. // logEvents logs all run events from the stream
  36. func logEvents(t *testing.T, stream <-chan workflow.RunEvent) {
  37. for event := range stream {
  38. stepID := ""
  39. if event.StepID != nil {
  40. stepID = *event.StepID
  41. }
  42. t.Logf("RunEvent: %s - %s", event.Type, stepID)
  43. }
  44. }
  45. // createFileOpsHandler creates a file operations component handler
  46. func createFileOpsHandler(fileAdapter workflow.FileAdapter) func(context.Context, map[string]interface{}) (map[string]interface{}, error) {
  47. return func(ctx context.Context, params map[string]interface{}) (map[string]interface{}, error) {
  48. operation, ok := params["operation"].(string)
  49. if !ok {
  50. return nil, fmt.Errorf("operation parameter is required")
  51. }
  52. path, ok := params["path"].(string)
  53. if !ok {
  54. return nil, fmt.Errorf("path parameter is required")
  55. }
  56. switch operation {
  57. case "read":
  58. content, err := fileAdapter.Read(ctx, path)
  59. if err != nil {
  60. return nil, err
  61. }
  62. return map[string]interface{}{
  63. "content": string(content),
  64. }, nil
  65. case "write":
  66. contentVal, ok := params["content"]
  67. if !ok {
  68. return nil, fmt.Errorf("content parameter is required for write operation")
  69. }
  70. // Optimized string conversion
  71. var content []byte
  72. if str, ok := contentVal.(string); ok {
  73. content = []byte(str)
  74. } else {
  75. content = []byte(fmt.Sprint(contentVal))
  76. }
  77. err := fileAdapter.Write(ctx, path, content, workflow.WriteModeOverwrite)
  78. if err != nil {
  79. return nil, err
  80. }
  81. return map[string]interface{}{
  82. "success": true,
  83. }, nil
  84. default:
  85. return nil, fmt.Errorf("unsupported operation: %s", operation)
  86. }
  87. }
  88. }
  89. // logFinalVariables logs the final variable state for debugging
  90. func logFinalVariables(t *testing.T, vars map[string]interface{}, keys ...string) {
  91. values := make([]interface{}, len(keys))
  92. for i, key := range keys {
  93. values[i] = vars[key]
  94. }
  95. t.Logf("Final variables: %v", values)
  96. }
  97. // ExampleWorkflow demonstrates a simple workflow with service calls
  98. func ExampleWorkflow() {
  99. // Define a workflow
  100. wf := &workflow.Workflow{
  101. Version: "3.6",
  102. Name: "Simple Task Workflow",
  103. Registry: workflow.Registry{
  104. Services: []string{
  105. "PlannerService(prd(STRING)) RETURN plan(OBJECT)",
  106. "ValidateService(plan(OBJECT)) RETURN isValid(BOOL), errors([STRING])",
  107. },
  108. Components: []string{"FileOps"},
  109. Vars: []string{
  110. "$prdText(STRING)",
  111. "$plan(OBJECT)",
  112. "$isValid(BOOL)",
  113. },
  114. Files: workflow.FilesRegistry{
  115. Inputs: []string{"Process/PRD.json"},
  116. Artifacts: []string{"Process/Artifacts/*"},
  117. },
  118. },
  119. Steps: []workflow.Step{
  120. {
  121. ID: "Service_PlannerService",
  122. In: workflow.StepInput{
  123. "prd": "=$prdText",
  124. },
  125. Out: workflow.StepOutput{
  126. "$plan": "=_result.plan",
  127. },
  128. Next: "Service_ValidateService",
  129. },
  130. {
  131. ID: "Service_ValidateService",
  132. In: workflow.StepInput{
  133. "plan": "=$plan",
  134. },
  135. Out: workflow.StepOutput{
  136. "$isValid": "=_result.isValid",
  137. },
  138. Next: "Branch_CheckValid",
  139. },
  140. {
  141. ID: "Branch_CheckValid",
  142. Cases: [][]string{
  143. {"$isValid == true", "Write_Success"},
  144. {"ELSE", "Write_Error"},
  145. },
  146. Next: "Stop_End",
  147. },
  148. {
  149. ID: "Write_Success",
  150. Target: "Process/Artifacts/result.txt",
  151. Value: "Validation passed",
  152. Next: "Stop_End",
  153. },
  154. {
  155. ID: "Write_Error",
  156. Target: "Process/Artifacts/error.txt",
  157. Value: "Validation failed",
  158. Next: "Stop_End",
  159. },
  160. {
  161. ID: "Stop_End",
  162. },
  163. },
  164. }
  165. // Create adapters
  166. serviceAdapter := workflow.NewDefaultServiceAdapter()
  167. serviceAdapter.RegisterHandler("PlannerService", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) {
  168. return &workflow.ServiceResult{
  169. Data: map[string]interface{}{
  170. "plan": map[string]interface{}{
  171. "tasks": []string{"task1", "task2"},
  172. "owner": "agent",
  173. },
  174. },
  175. }, nil
  176. })
  177. serviceAdapter.RegisterHandler("ValidateService", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) {
  178. return &workflow.ServiceResult{
  179. Data: map[string]interface{}{
  180. "isValid": true,
  181. "errors": []string{},
  182. },
  183. }, nil
  184. })
  185. adapters := &workflow.Adapters{
  186. Service: serviceAdapter,
  187. Component: workflow.NewDefaultComponentAdapter(),
  188. LLM: workflow.NewDefaultLLMAdapter(),
  189. File: workflow.NewDefaultFileAdapter(),
  190. }
  191. // Create engine
  192. engine, err := workflow.NewEngine(wf)
  193. if err != nil {
  194. fmt.Printf("Failed to create engine: %v\n", err)
  195. return
  196. }
  197. // Execute workflow
  198. initialVars := map[string]interface{}{
  199. "$prdText": "Build a user authentication system",
  200. }
  201. result, err := engine.Execute(context.Background(), initialVars, adapters)
  202. if err != nil {
  203. fmt.Printf("Failed to execute workflow: %v\n", err)
  204. return
  205. }
  206. // Consume events
  207. for range result.RunEventStream {
  208. }
  209. fmt.Println("Workflow completed")
  210. // Output: Workflow completed
  211. }
  212. // TestLoopWorkflow demonstrates a loop workflow
  213. func TestLoopWorkflow(t *testing.T) {
  214. wf := &workflow.Workflow{
  215. Version: "3.6",
  216. Name: "Loop Example",
  217. Registry: workflow.Registry{
  218. Services: []string{},
  219. Components: []string{},
  220. Vars: []string{
  221. "$items([OBJECT])",
  222. "$results([STRING])",
  223. },
  224. Files: workflow.FilesRegistry{
  225. Inputs: []string{},
  226. Artifacts: []string{"Process/Artifacts/*"},
  227. },
  228. },
  229. Steps: []workflow.Step{
  230. {
  231. ID: "Loop_ProcessItems",
  232. Mode: "serial",
  233. Source: "=$items", // Using new = prefix format
  234. Children: []string{"Set_ProcessOne"},
  235. Next: "Stop_End",
  236. },
  237. {
  238. ID: "Set_ProcessOne",
  239. Target: "$results[_index]",
  240. Value: "=_item.name",
  241. Next: "RETURN",
  242. },
  243. {
  244. ID: "Stop_End",
  245. },
  246. },
  247. }
  248. adapters := createTestAdapters()
  249. engine, err := workflow.NewEngine(wf)
  250. if err != nil {
  251. t.Fatalf("Failed to create engine: %v", err)
  252. }
  253. initialVars := map[string]interface{}{
  254. "$items": []interface{}{
  255. map[string]interface{}{"name": "item1"},
  256. map[string]interface{}{"name": "item2"},
  257. map[string]interface{}{"name": "item3"},
  258. },
  259. "$results": []interface{}{},
  260. }
  261. result, err := engine.Execute(context.Background(), initialVars, adapters)
  262. if err != nil {
  263. t.Fatalf("Failed to execute workflow: %v", err)
  264. }
  265. // Consume events
  266. logEvents(t, result.RunEventStream)
  267. // Display the set value
  268. t.Logf("$results value: %v", result.Context.Variables["$results"])
  269. t.Log("Loop workflow completed")
  270. }
  271. // TestJSONWorkflow demonstrates loading a workflow from JSON
  272. func TestJSONWorkflow(t *testing.T) {
  273. jsonData := `{
  274. "version": "3.6",
  275. "name": "JSON Example",
  276. "registry": {
  277. "services": ["TestService(input(STRING)) RETURN output(STRING)"],
  278. "components": [],
  279. "vars": ["$input(STRING)", "$output(STRING)"],
  280. "files": {
  281. "inputs": [],
  282. "artifacts": []
  283. }
  284. },
  285. "steps": [
  286. {
  287. "id": "Service_TestService",
  288. "in": {"input": "=$input"},
  289. "out": {"$output": "=_result.output"},
  290. "next": "Stop_End"
  291. },
  292. {
  293. "id": "Stop_End"
  294. }
  295. ]
  296. }`
  297. var wf workflow.Workflow
  298. if err := json.Unmarshal([]byte(jsonData), &wf); err != nil {
  299. t.Fatalf("Failed to unmarshal workflow: %v", err)
  300. }
  301. t.Logf("Loaded workflow: %s", wf.Name)
  302. // Validate
  303. if err := wf.Validate(); err != nil {
  304. t.Fatalf("Invalid workflow: %v", err)
  305. }
  306. t.Log("Workflow is valid")
  307. }
  308. // TestIfCondition tests conditional step execution with if property
  309. func TestIfCondition(t *testing.T) {
  310. wf := &workflow.Workflow{
  311. Version: "3.6",
  312. Name: "If Condition Test",
  313. Registry: workflow.Registry{
  314. Services: []string{},
  315. Components: []string{},
  316. Vars: []string{
  317. "$enableFeatureA(BOOL)",
  318. "$enableFeatureB(BOOL)",
  319. "$executedA(STRING)",
  320. "$executedB(STRING)",
  321. "$executedC(STRING)",
  322. "$amount(NUMBER)",
  323. },
  324. Files: workflow.FilesRegistry{
  325. Inputs: []string{},
  326. Artifacts: []string{},
  327. },
  328. },
  329. Steps: []workflow.Step{
  330. {
  331. ID: "Set_ExecuteA",
  332. If: "$enableFeatureA == true",
  333. Target: "$executedA",
  334. Value: "A was executed",
  335. Next: "Set_ExecuteB",
  336. },
  337. {
  338. ID: "Set_ExecuteB",
  339. If: "$enableFeatureB == true",
  340. Target: "$executedB",
  341. Value: "B was executed",
  342. Next: "Set_ExecuteC",
  343. },
  344. {
  345. ID: "Set_ExecuteC",
  346. If: "$amount > 100",
  347. Target: "$executedC",
  348. Value: "C was executed",
  349. Next: "Stop_End",
  350. },
  351. {
  352. ID: "Stop_End",
  353. },
  354. },
  355. }
  356. adapters := createTestAdapters()
  357. engine, err := workflow.NewEngine(wf)
  358. if err != nil {
  359. t.Fatalf("Failed to create engine: %v", err)
  360. }
  361. // Test case 1: enableFeatureA=true, enableFeatureB=false, amount=50
  362. t.Run("EnableA_DisableB_LowAmount", func(t *testing.T) {
  363. initialVars := map[string]interface{}{
  364. "$enableFeatureA": true,
  365. "$enableFeatureB": false,
  366. "$amount": 50,
  367. }
  368. result, err := engine.Execute(context.Background(), initialVars, adapters)
  369. if err != nil {
  370. t.Fatalf("Failed to execute workflow: %v", err)
  371. }
  372. // Consume events
  373. logEvents(t, result.RunEventStream)
  374. // Verify Set_ExecuteA was executed (if condition true)
  375. if result.Context.Variables["$executedA"] != "A was executed" {
  376. t.Errorf("Expected $executedA to be 'A was executed', got: %v", result.Context.Variables["$executedA"])
  377. }
  378. // Verify Set_ExecuteB was skipped (if condition false)
  379. if result.Context.Variables["$executedB"] != nil {
  380. t.Errorf("Expected $executedB to be nil (step skipped), got: %v", result.Context.Variables["$executedB"])
  381. }
  382. // Verify Set_ExecuteC was skipped (if condition false: amount <= 100)
  383. if result.Context.Variables["$executedC"] != nil {
  384. t.Errorf("Expected $executedC to be nil (step skipped), got: %v", result.Context.Variables["$executedC"])
  385. }
  386. t.Logf("Final variables: executedA=%v, executedB=%v, executedC=%v",
  387. result.Context.Variables["$executedA"],
  388. result.Context.Variables["$executedB"],
  389. result.Context.Variables["$executedC"])
  390. })
  391. // Test case 2: enableFeatureA=false, enableFeatureB=true, amount=150
  392. t.Run("DisableA_EnableB_HighAmount", func(t *testing.T) {
  393. initialVars := map[string]interface{}{
  394. "$enableFeatureA": false,
  395. "$enableFeatureB": true,
  396. "$amount": 150,
  397. }
  398. result, err := engine.Execute(context.Background(), initialVars, adapters)
  399. if err != nil {
  400. t.Fatalf("Failed to execute workflow: %v", err)
  401. }
  402. // Consume events
  403. logEvents(t, result.RunEventStream)
  404. // Verify Set_ExecuteA was skipped (if condition false)
  405. if result.Context.Variables["$executedA"] != nil {
  406. t.Errorf("Expected $executedA to be nil (step skipped), got: %v", result.Context.Variables["$executedA"])
  407. }
  408. // Verify Set_ExecuteB was executed (if condition true)
  409. if result.Context.Variables["$executedB"] != "B was executed" {
  410. t.Errorf("Expected $executedB to be 'B was executed', got: %v", result.Context.Variables["$executedB"])
  411. }
  412. // Verify Set_ExecuteC was executed (if condition true: amount > 100)
  413. if result.Context.Variables["$executedC"] != "C was executed" {
  414. t.Errorf("Expected $executedC to be 'C was executed', got: %v", result.Context.Variables["$executedC"])
  415. }
  416. })
  417. // Test case 3: All conditions true
  418. t.Run("AllConditionsTrue", func(t *testing.T) {
  419. initialVars := map[string]interface{}{
  420. "$enableFeatureA": true,
  421. "$enableFeatureB": true,
  422. "$amount": 200,
  423. }
  424. result, err := engine.Execute(context.Background(), initialVars, adapters)
  425. if err != nil {
  426. t.Fatalf("Failed to execute workflow: %v", err)
  427. }
  428. // Consume events
  429. logEvents(t, result.RunEventStream)
  430. // Verify all steps were executed
  431. if result.Context.Variables["$executedA"] != "A was executed" {
  432. t.Errorf("Expected $executedA to be 'A was executed', got: %v", result.Context.Variables["$executedA"])
  433. }
  434. if result.Context.Variables["$executedB"] != "B was executed" {
  435. t.Errorf("Expected $executedB to be 'B was executed', got: %v", result.Context.Variables["$executedB"])
  436. }
  437. if result.Context.Variables["$executedC"] != "C was executed" {
  438. t.Errorf("Expected $executedC to be 'C was executed', got: %v", result.Context.Variables["$executedC"])
  439. }
  440. })
  441. }
  442. // TestFileReadWrite tests file read and write operations using in-memory storage
  443. func TestFileReadWrite(t *testing.T) {
  444. t.Run("WriteReadAppend", func(t *testing.T) {
  445. // Create fresh adapters for this test
  446. fileAdapter := workflow.NewDefaultFileAdapter()
  447. // Register FileOps component handler using helper
  448. componentAdapter := workflow.NewDefaultComponentAdapter()
  449. fileOpsHandler := createFileOpsHandler(fileAdapter)
  450. componentAdapter.RegisterHandler("FileOps", fileOpsHandler)
  451. componentAdapter.RegisterHandler("FileOps2", fileOpsHandler)
  452. adapters := createTestAdaptersWithFile(fileAdapter)
  453. adapters.Component = componentAdapter
  454. wf := &workflow.Workflow{
  455. Version: "3.6",
  456. Name: "File Read/Write Test",
  457. Registry: workflow.Registry{
  458. Services: []string{},
  459. Components: []string{"FileOps"},
  460. Vars: []string{
  461. "$message(STRING)",
  462. "$fileContent(STRING)",
  463. "$appendedContent(STRING)",
  464. },
  465. Files: workflow.FilesRegistry{
  466. Inputs: []string{"Process/input.txt"},
  467. Artifacts: []string{"Process/Artifacts/*"},
  468. },
  469. },
  470. Steps: []workflow.Step{
  471. {
  472. ID: "Write_InitialFile",
  473. Target: "Process/Artifacts/test.txt",
  474. Value: "=$message",
  475. Next: "Component_FileOps",
  476. },
  477. {
  478. ID: "Component_FileOps",
  479. In: workflow.StepInput{
  480. "operation": "read",
  481. "path": "Process/Artifacts/test.txt",
  482. },
  483. Out: workflow.StepOutput{
  484. "$fileContent": "=_result.content",
  485. },
  486. Next: "Write_AppendFile",
  487. },
  488. {
  489. ID: "Write_AppendFile",
  490. Target: "Process/Artifacts/test.txt",
  491. Value: " - appended",
  492. Mode: "append",
  493. Next: "Component_FileOps2",
  494. },
  495. {
  496. ID: "Component_FileOps2",
  497. In: workflow.StepInput{
  498. "operation": "read",
  499. "path": "Process/Artifacts/test.txt",
  500. },
  501. Out: workflow.StepOutput{
  502. "$appendedContent": "=_result.content",
  503. },
  504. Next: "Write_NewFile",
  505. },
  506. {
  507. ID: "Write_NewFile",
  508. Target: "Process/Artifacts/new.txt",
  509. Value: "New file content",
  510. Mode: "failIfExists",
  511. Next: "Stop_End",
  512. },
  513. {
  514. ID: "Stop_End",
  515. },
  516. },
  517. }
  518. engine, err := workflow.NewEngine(wf)
  519. if err != nil {
  520. t.Fatalf("Failed to create engine: %v", err)
  521. }
  522. initialVars := map[string]interface{}{
  523. "$message": "Hello, World!",
  524. }
  525. result, err := engine.Execute(context.Background(), initialVars, adapters)
  526. if err != nil {
  527. t.Fatalf("Failed to execute workflow: %v", err)
  528. }
  529. // Consume events
  530. logEvents(t, result.RunEventStream)
  531. // Verify initial write and read
  532. fileContent := result.Context.Variables["$fileContent"]
  533. if fileContent != "Hello, World!" {
  534. t.Errorf("Expected $fileContent to be 'Hello, World!', got: %v", fileContent)
  535. }
  536. // Verify append operation
  537. appendedContent := result.Context.Variables["$appendedContent"]
  538. if appendedContent != "Hello, World! - appended" {
  539. t.Errorf("Expected $appendedContent to be 'Hello, World! - appended', got: %v", appendedContent)
  540. }
  541. // Verify files exist in adapter
  542. testFileContent := fileAdapter.GetFile("Process/Artifacts/test.txt")
  543. if string(testFileContent) != "Hello, World! - appended" {
  544. t.Errorf("Expected test.txt content to be 'Hello, World! - appended', got: %s", string(testFileContent))
  545. }
  546. newFileContent := fileAdapter.GetFile("Process/Artifacts/new.txt")
  547. if string(newFileContent) != "New file content" {
  548. t.Errorf("Expected new.txt content to be 'New file content', got: %s", string(newFileContent))
  549. }
  550. })
  551. t.Run("FailIfExists", func(t *testing.T) {
  552. // Create fresh adapters for this test
  553. fileAdapter := workflow.NewDefaultFileAdapter()
  554. adapters := createTestAdaptersWithFile(fileAdapter)
  555. // Create a new workflow that tries to write to an existing file with failIfExists mode
  556. wfFail := &workflow.Workflow{
  557. Version: "3.6",
  558. Name: "Fail If Exists Test",
  559. Registry: workflow.Registry{
  560. Services: []string{},
  561. Components: []string{},
  562. Vars: []string{},
  563. Files: workflow.FilesRegistry{
  564. Inputs: []string{},
  565. Artifacts: []string{"Process/Artifacts/*"},
  566. },
  567. },
  568. Steps: []workflow.Step{
  569. {
  570. ID: "Write_Existing",
  571. Target: "Process/Artifacts/existing.txt",
  572. Value: "Should fail",
  573. Mode: "failIfExists",
  574. Next: "Stop_End",
  575. },
  576. {
  577. ID: "Stop_End",
  578. },
  579. },
  580. }
  581. // Pre-populate the file
  582. fileAdapter.SetFile("Process/Artifacts/existing.txt", []byte("Already exists"))
  583. engineFail, err := workflow.NewEngine(wfFail)
  584. if err != nil {
  585. t.Fatalf("Failed to create engine: %v", err)
  586. }
  587. result, err := engineFail.Execute(context.Background(), map[string]interface{}{}, adapters)
  588. if err != nil {
  589. t.Fatalf("Execute returned error: %v", err)
  590. }
  591. // Consume events
  592. for event := range result.RunEventStream {
  593. if event.Type == workflow.RunEventStepError || event.Type == workflow.RunEventWorkflowFailed {
  594. t.Logf("Error event: %s - %v", event.Type, event.Payload)
  595. }
  596. }
  597. // Check that workflow failed
  598. if result.Context.Status != workflow.StatusFailed {
  599. t.Fatalf("Expected workflow status to be 'failed', got: %s", result.Context.Status)
  600. }
  601. // Verify original content is unchanged
  602. content := fileAdapter.GetFile("Process/Artifacts/existing.txt")
  603. if string(content) != "Already exists" {
  604. t.Errorf("Expected content to remain 'Already exists', got: %s", string(content))
  605. }
  606. })
  607. t.Run("OverwriteMode", func(t *testing.T) {
  608. // Create fresh adapters for this test
  609. fileAdapter := workflow.NewDefaultFileAdapter()
  610. adapters := createTestAdaptersWithFile(fileAdapter)
  611. // Create a new workflow that overwrites an existing file
  612. wfOverwrite := &workflow.Workflow{
  613. Version: "3.6",
  614. Name: "Overwrite Test",
  615. Registry: workflow.Registry{
  616. Services: []string{},
  617. Components: []string{},
  618. Vars: []string{},
  619. Files: workflow.FilesRegistry{
  620. Inputs: []string{},
  621. Artifacts: []string{"Process/Artifacts/*"},
  622. },
  623. },
  624. Steps: []workflow.Step{
  625. {
  626. ID: "Write_Overwrite",
  627. Target: "Process/Artifacts/overwrite.txt",
  628. Value: "New content",
  629. Mode: "overwrite",
  630. Next: "Stop_End",
  631. },
  632. {
  633. ID: "Stop_End",
  634. },
  635. },
  636. }
  637. // Pre-populate the file
  638. fileAdapter.SetFile("Process/Artifacts/overwrite.txt", []byte("Old content"))
  639. engineOverwrite, err := workflow.NewEngine(wfOverwrite)
  640. if err != nil {
  641. t.Fatalf("Failed to create engine: %v", err)
  642. }
  643. result, err := engineOverwrite.Execute(context.Background(), map[string]interface{}{}, adapters)
  644. if err != nil {
  645. t.Fatalf("Failed to execute workflow: %v", err)
  646. }
  647. // Consume events
  648. drainEvents(result.RunEventStream)
  649. // Verify content was overwritten
  650. content := fileAdapter.GetFile("Process/Artifacts/overwrite.txt")
  651. if string(content) != "New content" {
  652. t.Errorf("Expected content to be 'New content', got: %s", string(content))
  653. }
  654. })
  655. }
  656. // TestParallelLoopExecution tests parallel loop execution
  657. func TestParallelLoopExecution(t *testing.T) {
  658. t.Run("ParallelMode", func(t *testing.T) {
  659. wf := &workflow.Workflow{
  660. Version: "3.6",
  661. Name: "ParallelLoopTest",
  662. Registry: workflow.Registry{
  663. Files: workflow.FilesRegistry{
  664. Artifacts: []string{"Process/Artifacts/**"},
  665. },
  666. },
  667. Steps: []workflow.Step{
  668. {
  669. ID: "Loop_ProcessParallel",
  670. Mode: "parallel", // Parallel mode
  671. Source: "=$items",
  672. Children: []string{"Set_Result"},
  673. Next: "Stop_End",
  674. },
  675. {
  676. ID: "Set_Result",
  677. Target: "$results[_index]",
  678. Value: "=_item",
  679. Next: "RETURN",
  680. },
  681. {
  682. ID: "Stop_End",
  683. },
  684. },
  685. }
  686. // Create engine
  687. engine, err := workflow.NewEngine(wf)
  688. if err != nil {
  689. t.Fatalf("Failed to create engine: %v", err)
  690. }
  691. // Execute workflow with initial variables
  692. adapters := createTestAdapters()
  693. initialVars := map[string]interface{}{
  694. "$items": []interface{}{int64(1), int64(2), int64(3), int64(4), int64(5), int64(6), int64(7), int64(8), int64(9), int64(10)},
  695. "$results": []interface{}{},
  696. }
  697. result, err := engine.Execute(context.Background(), initialVars, adapters)
  698. if err != nil {
  699. t.Fatalf("Failed to execute workflow: %v", err)
  700. }
  701. // Log events to see what's happening
  702. logEvents(t, result.RunEventStream)
  703. // Verify results - all items should be doubled
  704. results, ok := result.Context.Variables["$results"].([]interface{})
  705. if !ok {
  706. t.Fatalf("Expected $results to be a slice, got: %T", result.Context.Variables["$results"])
  707. }
  708. if len(results) != 10 {
  709. t.Errorf("Expected 10 results, got %d", len(results))
  710. }
  711. // Verify each result matches the input
  712. for i := 0; i < 10; i++ {
  713. expected := int64(i + 1)
  714. actual, ok := results[i].(int64)
  715. if !ok {
  716. t.Errorf("Result[%d]: expected int64, got %T", i, results[i])
  717. continue
  718. }
  719. if actual != expected {
  720. t.Errorf("Result[%d]: expected %d, got %d", i, expected, actual)
  721. }
  722. }
  723. })
  724. t.Run("SerialVsParallelComparison", func(t *testing.T) {
  725. // Test that serial and parallel modes produce identical results
  726. testCases := []struct {
  727. mode string
  728. }{
  729. {"serial"},
  730. {"parallel"},
  731. }
  732. for _, tc := range testCases {
  733. t.Run(tc.mode, func(t *testing.T) {
  734. wf := &workflow.Workflow{
  735. Version: "3.6",
  736. Name: "ComparisonTest",
  737. Registry: workflow.Registry{
  738. Files: workflow.FilesRegistry{
  739. Artifacts: []string{"Process/Artifacts/**"},
  740. },
  741. },
  742. Steps: []workflow.Step{
  743. {
  744. ID: "Loop_Process",
  745. Mode: tc.mode,
  746. Source: "=$items",
  747. Children: []string{"Set_Squared"},
  748. Next: "Stop_End",
  749. },
  750. {
  751. ID: "Set_Squared",
  752. Target: "$squares[_index]",
  753. Value: "=_item",
  754. Next: "RETURN",
  755. },
  756. {
  757. ID: "Stop_End",
  758. },
  759. },
  760. }
  761. engine, err := workflow.NewEngine(wf)
  762. if err != nil {
  763. t.Fatalf("Failed to create engine: %v", err)
  764. }
  765. adapters := createTestAdapters()
  766. initialVars := map[string]interface{}{
  767. "$items": []interface{}{int64(5), int64(10), int64(15), int64(20)},
  768. "$squares": []interface{}{},
  769. }
  770. result, err := engine.Execute(context.Background(), initialVars, adapters)
  771. if err != nil {
  772. t.Fatalf("Failed to execute workflow: %v", err)
  773. }
  774. drainEvents(result.RunEventStream)
  775. // Verify results
  776. squares, ok := result.Context.Variables["$squares"].([]interface{})
  777. if !ok {
  778. t.Fatalf("Expected $squares to be a slice")
  779. }
  780. expected := []int64{5, 10, 15, 20}
  781. for i, exp := range expected {
  782. actual := squares[i].(int64)
  783. if actual != exp {
  784. t.Errorf("squares[%d]: expected %d, got %d", i, exp, actual)
  785. }
  786. }
  787. })
  788. }
  789. })
  790. }
  791. // TestParallelChildrenExecution tests parallel children execution
  792. func TestParallelChildrenExecution(t *testing.T) {
  793. t.Run("MultipleChildren", func(t *testing.T) {
  794. wf := &workflow.Workflow{
  795. Version: "3.6",
  796. Name: "ParallelChildrenTest",
  797. Registry: workflow.Registry{
  798. Files: workflow.FilesRegistry{
  799. Artifacts: []string{"Process/Artifacts/**"},
  800. },
  801. },
  802. Steps: []workflow.Step{
  803. {
  804. ID: "Noop_Parent",
  805. Children: []string{"Set_Result1", "Set_Result2", "Set_Result3", "Set_Result4"},
  806. Next: "Stop_End",
  807. },
  808. {
  809. ID: "Set_Result1",
  810. Target: "$result1",
  811. Value: "=$input",
  812. Next: "RETURN",
  813. },
  814. {
  815. ID: "Set_Result2",
  816. Target: "$result2",
  817. Value: "=$input",
  818. Next: "RETURN",
  819. },
  820. {
  821. ID: "Set_Result3",
  822. Target: "$result3",
  823. Value: "=$input",
  824. Next: "RETURN",
  825. },
  826. {
  827. ID: "Set_Result4",
  828. Target: "$result4",
  829. Value: "=$input",
  830. Next: "RETURN",
  831. },
  832. {
  833. ID: "Stop_End",
  834. },
  835. },
  836. }
  837. engine, err := workflow.NewEngine(wf)
  838. if err != nil {
  839. t.Fatalf("Failed to create engine: %v", err)
  840. }
  841. adapters := createTestAdapters()
  842. initialVars := map[string]interface{}{
  843. "$input": int64(100),
  844. }
  845. result, err := engine.Execute(context.Background(), initialVars, adapters)
  846. if err != nil {
  847. t.Fatalf("Failed to execute workflow: %v", err)
  848. }
  849. drainEvents(result.RunEventStream)
  850. // Verify all children executed and set their results
  851. vars := result.Context.Variables
  852. result1, ok := vars["$result1"].(int64)
  853. if !ok || result1 != 100 {
  854. t.Errorf("Expected $result1 to be 100, got: %v", vars["$result1"])
  855. }
  856. result2, ok := vars["$result2"].(int64)
  857. if !ok || result2 != 100 {
  858. t.Errorf("Expected $result2 to be 100, got: %v", vars["$result2"])
  859. }
  860. result3, ok := vars["$result3"].(int64)
  861. if !ok || result3 != 100 {
  862. t.Errorf("Expected $result3 to be 100, got: %v", vars["$result3"])
  863. }
  864. result4, ok := vars["$result4"].(int64)
  865. if !ok || result4 != 100 {
  866. t.Errorf("Expected $result4 to be 100, got: %v", vars["$result4"])
  867. }
  868. })
  869. t.Run("SingleChildOptimization", func(t *testing.T) {
  870. // Verify single child doesn't spawn unnecessary goroutines
  871. wf := &workflow.Workflow{
  872. Version: "3.6",
  873. Name: "SingleChildTest",
  874. Registry: workflow.Registry{
  875. Files: workflow.FilesRegistry{
  876. Artifacts: []string{"Process/Artifacts/**"},
  877. },
  878. },
  879. Steps: []workflow.Step{
  880. {
  881. ID: "Noop_Parent",
  882. Children: []string{"Set_Result"}, // Single child
  883. Next: "Stop_End",
  884. },
  885. {
  886. ID: "Set_Result",
  887. Target: "$result",
  888. Value: "=$value",
  889. Next: "RETURN",
  890. },
  891. {
  892. ID: "Stop_End",
  893. },
  894. },
  895. }
  896. engine, err := workflow.NewEngine(wf)
  897. if err != nil {
  898. t.Fatalf("Failed to create engine: %v", err)
  899. }
  900. adapters := createTestAdapters()
  901. initialVars := map[string]interface{}{
  902. "$value": int64(42),
  903. }
  904. result, err := engine.Execute(context.Background(), initialVars, adapters)
  905. if err != nil {
  906. t.Fatalf("Failed to execute workflow: %v", err)
  907. }
  908. drainEvents(result.RunEventStream)
  909. // Verify result
  910. resultVal, ok := result.Context.Variables["$result"].(int64)
  911. if !ok || resultVal != 42 {
  912. t.Errorf("Expected $result to be 42, got: %v", result.Context.Variables["$result"])
  913. }
  914. })
  915. }
  916. // TestParallelErrorHandling tests error handling in parallel execution
  917. func TestParallelErrorHandling(t *testing.T) {
  918. t.Run("ParallelLoopWithError", func(t *testing.T) {
  919. wf := &workflow.Workflow{
  920. Version: "3.6",
  921. Name: "ParallelErrorTest",
  922. Registry: workflow.Registry{
  923. Files: workflow.FilesRegistry{
  924. Artifacts: []string{"Process/Artifacts/**"},
  925. },
  926. },
  927. Steps: []workflow.Step{
  928. {
  929. ID: "Loop_Process",
  930. Mode: "parallel",
  931. Source: "=$items",
  932. Children: []string{"Set_InvalidTarget"},
  933. Next: "Stop_End",
  934. },
  935. {
  936. ID: "Set_InvalidTarget",
  937. Target: "invalidTarget", // This will cause an error (not starting with $)
  938. Value: "=_item * 2",
  939. Next: "RETURN",
  940. },
  941. {
  942. ID: "Stop_End",
  943. },
  944. },
  945. }
  946. engine, err := workflow.NewEngine(wf)
  947. if err != nil {
  948. t.Fatalf("Failed to create engine: %v", err)
  949. }
  950. adapters := createTestAdapters()
  951. initialVars := map[string]interface{}{
  952. "$items": []interface{}{int64(1), int64(2), int64(3), int64(4), int64(5)},
  953. }
  954. result, err := engine.Execute(context.Background(), initialVars, adapters)
  955. if err != nil {
  956. t.Fatalf("Failed to execute workflow: %v", err)
  957. }
  958. // Collect events to check for errors
  959. hasError := false
  960. for event := range result.RunEventStream {
  961. if event.Type == workflow.RunEventStepError || event.Type == workflow.RunEventWorkflowFailed {
  962. hasError = true
  963. }
  964. }
  965. if !hasError {
  966. t.Error("Expected error event in parallel execution, but none found")
  967. }
  968. })
  969. }
  970. // TestFileOutput tests file output via step output mapping
  971. func TestFileOutput(t *testing.T) {
  972. t.Run("OutputToFile", func(t *testing.T) {
  973. // Create fresh adapters for this test
  974. fileAdapter := workflow.NewDefaultFileAdapter()
  975. serviceAdapter := workflow.NewDefaultServiceAdapter()
  976. serviceAdapter.RegisterHandler("GenerateReport", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) {
  977. return &workflow.ServiceResult{
  978. Data: map[string]interface{}{
  979. "report": "Generated report content",
  980. "summary": "Brief summary",
  981. },
  982. }, nil
  983. })
  984. adapters := &workflow.Adapters{
  985. Service: serviceAdapter,
  986. Component: workflow.NewDefaultComponentAdapter(),
  987. LLM: workflow.NewDefaultLLMAdapter(),
  988. File: fileAdapter,
  989. }
  990. wf := &workflow.Workflow{
  991. Version: "3.6",
  992. Name: "File Output Test",
  993. Registry: workflow.Registry{
  994. Services: []string{
  995. "GenerateReport(input(STRING)) RETURN report(STRING), summary(STRING)",
  996. },
  997. Components: []string{},
  998. Vars: []string{
  999. "$input(STRING)",
  1000. "$reportContent(STRING)",
  1001. },
  1002. Files: workflow.FilesRegistry{
  1003. Inputs: []string{},
  1004. Artifacts: []string{"Process/Artifacts/*"},
  1005. },
  1006. },
  1007. Steps: []workflow.Step{
  1008. {
  1009. ID: "Service_GenerateReport",
  1010. In: workflow.StepInput{
  1011. "input": "=$input",
  1012. },
  1013. Out: workflow.StepOutput{
  1014. "$reportContent": "=_result.report",
  1015. "/Process/Artifacts/report.txt": "=_result.report",
  1016. "/Process/Artifacts/summary.txt": "=_result.summary",
  1017. },
  1018. Next: "Stop_End",
  1019. },
  1020. {
  1021. ID: "Stop_End",
  1022. },
  1023. },
  1024. }
  1025. engine, err := workflow.NewEngine(wf)
  1026. if err != nil {
  1027. t.Fatalf("Failed to create engine: %v", err)
  1028. }
  1029. initialVars := map[string]interface{}{
  1030. "$input": "test input",
  1031. }
  1032. result, err := engine.Execute(context.Background(), initialVars, adapters)
  1033. if err != nil {
  1034. t.Fatalf("Failed to execute workflow: %v", err)
  1035. }
  1036. // Consume events and check for file write events
  1037. fileWriteCount := 0
  1038. for event := range result.RunEventStream {
  1039. t.Logf("Event: %s - %v", event.Type, event.StepID)
  1040. if event.Type == workflow.RunEventFileDone {
  1041. fileWriteCount++
  1042. t.Logf("File done event: path=%v", event.Payload["path"])
  1043. }
  1044. }
  1045. // Verify variable was set
  1046. if result.Context.Variables["$reportContent"] != "Generated report content" {
  1047. t.Errorf("Expected $reportContent to be 'Generated report content', got: %v", result.Context.Variables["$reportContent"])
  1048. }
  1049. // Verify files were written
  1050. reportContent := fileAdapter.GetFile("/Process/Artifacts/report.txt")
  1051. if string(reportContent) != "Generated report content" {
  1052. t.Errorf("Expected report.txt to contain 'Generated report content', got: %s", string(reportContent))
  1053. }
  1054. summaryContent := fileAdapter.GetFile("/Process/Artifacts/summary.txt")
  1055. if string(summaryContent) != "Brief summary" {
  1056. t.Errorf("Expected summary.txt to contain 'Brief summary', got: %s", string(summaryContent))
  1057. }
  1058. // Verify we got file write events
  1059. if fileWriteCount != 2 {
  1060. t.Errorf("Expected 2 file write events, got: %d", fileWriteCount)
  1061. }
  1062. })
  1063. t.Run("OutputToFileWithPathInterpolation", func(t *testing.T) {
  1064. // Test file output with variable interpolation in path
  1065. fileAdapter := workflow.NewDefaultFileAdapter()
  1066. serviceAdapter := workflow.NewDefaultServiceAdapter()
  1067. serviceAdapter.RegisterHandler("ProcessItem", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) {
  1068. name := params["name"].(string)
  1069. return &workflow.ServiceResult{
  1070. Data: map[string]interface{}{
  1071. "content": "Processed: " + name,
  1072. "path": name + ".txt",
  1073. },
  1074. }, nil
  1075. })
  1076. adapters := &workflow.Adapters{
  1077. Service: serviceAdapter,
  1078. Component: workflow.NewDefaultComponentAdapter(),
  1079. LLM: workflow.NewDefaultLLMAdapter(),
  1080. File: fileAdapter,
  1081. }
  1082. wf := &workflow.Workflow{
  1083. Version: "3.6",
  1084. Name: "File Output Path Interpolation Test",
  1085. Registry: workflow.Registry{
  1086. Services: []string{
  1087. "ProcessItem(name(STRING)) RETURN content(STRING), path(STRING)",
  1088. },
  1089. Components: []string{},
  1090. Vars: []string{
  1091. "$items([OBJECT])",
  1092. "$results([STRING])",
  1093. },
  1094. Files: workflow.FilesRegistry{
  1095. Inputs: []string{},
  1096. Artifacts: []string{"output/*"},
  1097. },
  1098. },
  1099. Steps: []workflow.Step{
  1100. {
  1101. ID: "Loop_ProcessItems",
  1102. Mode: "serial",
  1103. Source: "=$items",
  1104. Children: []string{"Service_ProcessItem"},
  1105. Next: "Stop_End",
  1106. },
  1107. {
  1108. ID: "Service_ProcessItem",
  1109. In: workflow.StepInput{
  1110. "name": "=_item.name",
  1111. },
  1112. Out: workflow.StepOutput{
  1113. "$results[_index]": "=_result.content",
  1114. "/output/{_result.path}": "=_result.content",
  1115. },
  1116. Next: "RETURN",
  1117. },
  1118. {
  1119. ID: "Stop_End",
  1120. },
  1121. },
  1122. }
  1123. engine, err := workflow.NewEngine(wf)
  1124. if err != nil {
  1125. t.Fatalf("Failed to create engine: %v", err)
  1126. }
  1127. initialVars := map[string]interface{}{
  1128. "$items": []interface{}{
  1129. map[string]interface{}{"name": "alpha"},
  1130. map[string]interface{}{"name": "beta"},
  1131. },
  1132. "$results": []interface{}{},
  1133. }
  1134. result, err := engine.Execute(context.Background(), initialVars, adapters)
  1135. if err != nil {
  1136. t.Fatalf("Failed to execute workflow: %v", err)
  1137. }
  1138. // Consume events
  1139. logEvents(t, result.RunEventStream)
  1140. // Verify files were written with interpolated paths
  1141. alphaContent := fileAdapter.GetFile("/output/alpha.txt")
  1142. if string(alphaContent) != "Processed: alpha" {
  1143. t.Errorf("Expected alpha.txt to contain 'Processed: alpha', got: %s", string(alphaContent))
  1144. }
  1145. betaContent := fileAdapter.GetFile("/output/beta.txt")
  1146. if string(betaContent) != "Processed: beta" {
  1147. t.Errorf("Expected beta.txt to contain 'Processed: beta', got: %s", string(betaContent))
  1148. }
  1149. // Verify results array
  1150. results, ok := result.Context.Variables["$results"].([]interface{})
  1151. if !ok {
  1152. t.Fatalf("Expected $results to be a slice, got: %T", result.Context.Variables["$results"])
  1153. }
  1154. if len(results) != 2 {
  1155. t.Fatalf("Expected 2 results, got %d", len(results))
  1156. }
  1157. if results[0] != "Processed: alpha" {
  1158. t.Errorf("Expected results[0] to be 'Processed: alpha', got: %v", results[0])
  1159. }
  1160. if results[1] != "Processed: beta" {
  1161. t.Errorf("Expected results[1] to be 'Processed: beta', got: %v", results[1])
  1162. }
  1163. })
  1164. }
  1165. // TestParallelConcurrency tests concurrent access patterns
  1166. func TestParallelConcurrency(t *testing.T) {
  1167. t.Run("ConcurrentVariableWrites", func(t *testing.T) {
  1168. // Test that concurrent writes to different array indices work correctly
  1169. wf := &workflow.Workflow{
  1170. Version: "3.6",
  1171. Name: "ConcurrencyTest",
  1172. Registry: workflow.Registry{
  1173. Files: workflow.FilesRegistry{
  1174. Artifacts: []string{"Process/Artifacts/**"},
  1175. },
  1176. },
  1177. Steps: []workflow.Step{
  1178. {
  1179. ID: "Loop_Process",
  1180. Mode: "parallel",
  1181. Source: "=$items",
  1182. Children: []string{"Set_Value"},
  1183. Next: "Stop_End",
  1184. },
  1185. {
  1186. ID: "Set_Value",
  1187. Target: "$outputs[_index]",
  1188. Value: "=_item",
  1189. Next: "RETURN",
  1190. },
  1191. {
  1192. ID: "Stop_End",
  1193. },
  1194. },
  1195. }
  1196. engine, err := workflow.NewEngine(wf)
  1197. if err != nil {
  1198. t.Fatalf("Failed to create engine: %v", err)
  1199. }
  1200. adapters := createTestAdapters()
  1201. items := make([]interface{}, 20)
  1202. for i := 0; i < 20; i++ {
  1203. items[i] = int64(i)
  1204. }
  1205. initialVars := map[string]interface{}{
  1206. "$items": items,
  1207. "$outputs": []interface{}{},
  1208. }
  1209. result, err := engine.Execute(context.Background(), initialVars, adapters)
  1210. if err != nil {
  1211. t.Fatalf("Failed to execute workflow: %v", err)
  1212. }
  1213. logEvents(t, result.RunEventStream)
  1214. // Verify all outputs are correct
  1215. outputs, ok := result.Context.Variables["$outputs"].([]interface{})
  1216. if !ok {
  1217. t.Fatalf("Expected $outputs to be a slice")
  1218. }
  1219. if len(outputs) != 20 {
  1220. t.Fatalf("Expected 20 outputs, got %d", len(outputs))
  1221. }
  1222. // Verify each output
  1223. for i := 0; i < 20; i++ {
  1224. expected := int64(i)
  1225. actual, ok := outputs[i].(int64)
  1226. if !ok {
  1227. t.Errorf("Output[%d]: expected int64, got %T", i, outputs[i])
  1228. continue
  1229. }
  1230. if actual != expected {
  1231. t.Errorf("Output[%d]: expected %d, got %d", i, expected, actual)
  1232. }
  1233. }
  1234. })
  1235. }
  1236. func TestAPICallWorkflow(t *testing.T) {
  1237. wf := &workflow.Workflow{
  1238. Version: "3.6",
  1239. Name: "API Test Workflow",
  1240. Registry: workflow.Registry{
  1241. Services: []string{},
  1242. APIs: []workflow.APIDefinition{
  1243. {
  1244. ID: "TestAPI",
  1245. Method: "POST",
  1246. URL: "https://api.example.com/v1/test",
  1247. Auth: "", // No auth for simple test
  1248. Headers: map[string]string{
  1249. "Content-Type": "application/json",
  1250. },
  1251. Desc: "Test API endpoint",
  1252. },
  1253. },
  1254. Components: []string{},
  1255. Vars: []string{"$result(OBJECT)", "$status(STRING)"},
  1256. Files: workflow.FilesRegistry{
  1257. Inputs: []string{},
  1258. Artifacts: []string{},
  1259. },
  1260. },
  1261. Steps: []workflow.Step{
  1262. {
  1263. ID: "API_TestAPI",
  1264. In: workflow.StepInput{
  1265. "body": map[string]interface{}{
  1266. "message": "Hello API",
  1267. },
  1268. },
  1269. Out: workflow.StepOutput{
  1270. "$result": "=_result",
  1271. "$status": "=_result.status",
  1272. },
  1273. Next: "Stop_End",
  1274. },
  1275. {
  1276. ID: "Stop_End",
  1277. },
  1278. },
  1279. }
  1280. // Create mock API adapter
  1281. apiAdapter := NewMockAPIAdapter()
  1282. apiAdapter.SetResponse(map[string]interface{}{
  1283. "status": "success",
  1284. "data": "Response data",
  1285. })
  1286. // Execute workflow
  1287. engine, err := workflow.NewEngine(wf)
  1288. if err != nil {
  1289. t.Fatalf("Failed to create engine: %v", err)
  1290. }
  1291. ctx := context.Background()
  1292. adapters := &workflow.Adapters{
  1293. Service: workflow.NewDefaultServiceAdapter(),
  1294. API: apiAdapter,
  1295. Component: workflow.NewDefaultComponentAdapter(),
  1296. LLM: workflow.NewDefaultLLMAdapter(),
  1297. File: workflow.NewDefaultFileAdapter(),
  1298. }
  1299. result, err := engine.Execute(ctx, map[string]interface{}{}, adapters)
  1300. if err != nil {
  1301. t.Fatalf("Failed to execute workflow: %v", err)
  1302. }
  1303. // Wait for completion
  1304. for event := range result.RunEventStream {
  1305. t.Logf("Event: %s - %v - Payload: %+v", event.Type, event.StepID, event.Payload)
  1306. if event.Type == workflow.RunEventStepError {
  1307. t.Logf("Step Error Payload: %v", event.Payload["error"])
  1308. }
  1309. if event.Type == workflow.RunEventWorkflowDone || event.Type == workflow.RunEventWorkflowFailed {
  1310. break
  1311. }
  1312. }
  1313. // Verify execution
  1314. status, ok := result.Context.Variables["$status"]
  1315. if !ok {
  1316. t.Fatalf("Expected $status to be set. Variables: %+v", result.Context.Variables)
  1317. }
  1318. if status != "success" {
  1319. t.Errorf("Expected status to be 'success', got %v", status)
  1320. }
  1321. }
  1322. // MockAPIAdapter is a test adapter for API calls
  1323. type MockAPIAdapter struct {
  1324. response map[string]interface{}
  1325. mu sync.RWMutex
  1326. }
  1327. func NewMockAPIAdapter() *MockAPIAdapter {
  1328. return &MockAPIAdapter{
  1329. response: make(map[string]interface{}),
  1330. }
  1331. }
  1332. func (m *MockAPIAdapter) SetResponse(resp map[string]interface{}) {
  1333. m.mu.Lock()
  1334. defer m.mu.Unlock()
  1335. m.response = resp
  1336. }
  1337. func (m *MockAPIAdapter) Call(ctx context.Context, apiDef *workflow.APIDefinition, params map[string]interface{}) (map[string]interface{}, error) {
  1338. m.mu.RLock()
  1339. defer m.mu.RUnlock()
  1340. return m.response, nil
  1341. }
  1342. // TestNilValidation tests nil parameter validation
  1343. func TestNilValidation(t *testing.T) {
  1344. t.Run("NilWorkflow", func(t *testing.T) {
  1345. _, err := workflow.NewEngine(nil)
  1346. if err == nil {
  1347. t.Fatal("Expected error for nil workflow")
  1348. }
  1349. if !strings.Contains(err.Error(), "cannot be nil") {
  1350. t.Errorf("Expected 'cannot be nil' error, got: %v", err)
  1351. }
  1352. })
  1353. t.Run("NilContext", func(t *testing.T) {
  1354. wf := &workflow.Workflow{
  1355. Version: "3.6",
  1356. Name: "Test",
  1357. Registry: workflow.Registry{
  1358. Services: []string{},
  1359. APIs: []workflow.APIDefinition{},
  1360. Components: []string{},
  1361. Vars: []string{},
  1362. Files: workflow.FilesRegistry{
  1363. Inputs: []string{},
  1364. Artifacts: []string{},
  1365. },
  1366. },
  1367. Steps: []workflow.Step{
  1368. {ID: "Stop_End"},
  1369. },
  1370. }
  1371. engine, err := workflow.NewEngine(wf)
  1372. if err != nil {
  1373. t.Fatalf("Failed to create engine: %v", err)
  1374. }
  1375. adapters := &workflow.Adapters{
  1376. Service: workflow.NewDefaultServiceAdapter(),
  1377. API: workflow.NewDefaultAPIAdapter(),
  1378. Component: workflow.NewDefaultComponentAdapter(),
  1379. LLM: workflow.NewDefaultLLMAdapter(),
  1380. File: workflow.NewDefaultFileAdapter(),
  1381. }
  1382. //nolint:staticcheck // intentionally passing nil context to test validation
  1383. _, err = engine.Execute(nil, nil, adapters)
  1384. if err == nil {
  1385. t.Fatal("Expected error for nil context")
  1386. }
  1387. if !strings.Contains(err.Error(), "context cannot be nil") {
  1388. t.Errorf("Expected 'context cannot be nil' error, got: %v", err)
  1389. }
  1390. })
  1391. t.Run("NilAdapters", func(t *testing.T) {
  1392. wf := &workflow.Workflow{
  1393. Version: "3.6",
  1394. Name: "Test",
  1395. Registry: workflow.Registry{
  1396. Services: []string{},
  1397. APIs: []workflow.APIDefinition{},
  1398. Components: []string{},
  1399. Vars: []string{},
  1400. Files: workflow.FilesRegistry{
  1401. Inputs: []string{},
  1402. Artifacts: []string{},
  1403. },
  1404. },
  1405. Steps: []workflow.Step{
  1406. {ID: "Stop_End"},
  1407. },
  1408. }
  1409. engine, err := workflow.NewEngine(wf)
  1410. if err != nil {
  1411. t.Fatalf("Failed to create engine: %v", err)
  1412. }
  1413. _, err = engine.Execute(context.Background(), nil, nil)
  1414. if err == nil {
  1415. t.Fatal("Expected error for nil adapters")
  1416. }
  1417. if !strings.Contains(err.Error(), "adapters cannot be nil") {
  1418. t.Errorf("Expected 'adapters cannot be nil' error, got: %v", err)
  1419. }
  1420. })
  1421. }
  1422. // TestArrayTypeConversion tests string to array type conversion for variables
  1423. func TestArrayTypeConversion(t *testing.T) {
  1424. t.Run("ConvertStringToObjectArray", func(t *testing.T) {
  1425. wf := &workflow.Workflow{
  1426. Version: "3.6",
  1427. Name: "Array Type Conversion Test",
  1428. Registry: workflow.Registry{
  1429. Services: []string{},
  1430. Components: []string{},
  1431. Vars: []string{
  1432. "$items([OBJECT])",
  1433. "$names([STRING])",
  1434. "$count(NUMBER)",
  1435. },
  1436. Files: workflow.FilesRegistry{
  1437. Inputs: []string{},
  1438. Artifacts: []string{},
  1439. },
  1440. },
  1441. Steps: []workflow.Step{
  1442. {
  1443. ID: "Set_Items",
  1444. Target: "$items",
  1445. Value: `[{"name":"item1","value":10},{"name":"item2","value":20}]`,
  1446. Next: "Set_Names",
  1447. },
  1448. {
  1449. ID: "Set_Names",
  1450. Target: "$names",
  1451. Value: `["alice","bob","charlie"]`,
  1452. Next: "Set_Count",
  1453. },
  1454. {
  1455. ID: "Set_Count",
  1456. Target: "$count",
  1457. Value: "3",
  1458. Next: "Stop_End",
  1459. },
  1460. {
  1461. ID: "Stop_End",
  1462. },
  1463. },
  1464. }
  1465. adapters := createTestAdapters()
  1466. engine, err := workflow.NewEngine(wf)
  1467. if err != nil {
  1468. t.Fatalf("Failed to create engine: %v", err)
  1469. }
  1470. result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters)
  1471. if err != nil {
  1472. t.Fatalf("Failed to execute workflow: %v", err)
  1473. }
  1474. // Consume events
  1475. logEvents(t, result.RunEventStream)
  1476. // Verify $items was converted from string to []interface{} with maps
  1477. items, ok := result.Context.Variables["$items"].([]interface{})
  1478. if !ok {
  1479. t.Fatalf("Expected $items to be []interface{}, got: %T", result.Context.Variables["$items"])
  1480. }
  1481. if len(items) != 2 {
  1482. t.Errorf("Expected 2 items, got %d", len(items))
  1483. }
  1484. // Check first item
  1485. item1, ok := items[0].(map[string]interface{})
  1486. if !ok {
  1487. t.Errorf("Expected items[0] to be map[string]interface{}, got: %T", items[0])
  1488. } else {
  1489. if item1["name"] != "item1" {
  1490. t.Errorf("Expected items[0].name to be 'item1', got: %v", item1["name"])
  1491. }
  1492. // JSON unmarshals numbers as float64
  1493. if item1["value"] != float64(10) {
  1494. t.Errorf("Expected items[0].value to be 10, got: %v", item1["value"])
  1495. }
  1496. }
  1497. // Verify $names was converted from string to []interface{} with strings
  1498. names, ok := result.Context.Variables["$names"].([]interface{})
  1499. if !ok {
  1500. t.Fatalf("Expected $names to be []interface{}, got: %T", result.Context.Variables["$names"])
  1501. }
  1502. if len(names) != 3 {
  1503. t.Errorf("Expected 3 names, got %d", len(names))
  1504. }
  1505. expectedNames := []string{"alice", "bob", "charlie"}
  1506. for i, expected := range expectedNames {
  1507. if names[i] != expected {
  1508. t.Errorf("Expected names[%d] to be '%s', got: %v", i, expected, names[i])
  1509. }
  1510. }
  1511. // Verify $count remains as string (no array type conversion)
  1512. count := result.Context.Variables["$count"]
  1513. if count != "3" {
  1514. t.Errorf("Expected $count to be string '3', got: %v (type: %T)", count, count)
  1515. }
  1516. })
  1517. t.Run("InvalidJSONStringForArray", func(t *testing.T) {
  1518. wf := &workflow.Workflow{
  1519. Version: "3.6",
  1520. Name: "Invalid Array JSON Test",
  1521. Registry: workflow.Registry{
  1522. Services: []string{},
  1523. Components: []string{},
  1524. Vars: []string{
  1525. "$items([OBJECT])",
  1526. },
  1527. Files: workflow.FilesRegistry{
  1528. Inputs: []string{},
  1529. Artifacts: []string{},
  1530. },
  1531. },
  1532. Steps: []workflow.Step{
  1533. {
  1534. ID: "Set_InvalidItems",
  1535. Target: "$items",
  1536. Value: `invalid json`,
  1537. Next: "Stop_End",
  1538. },
  1539. {
  1540. ID: "Stop_End",
  1541. },
  1542. },
  1543. }
  1544. adapters := createTestAdapters()
  1545. engine, err := workflow.NewEngine(wf)
  1546. if err != nil {
  1547. t.Fatalf("Failed to create engine: %v", err)
  1548. }
  1549. result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters)
  1550. if err != nil {
  1551. t.Fatalf("Execute returned error: %v", err)
  1552. }
  1553. // Consume events and check for error
  1554. hasError := false
  1555. for event := range result.RunEventStream {
  1556. t.Logf("Event: %s - %v - %v", event.Type, event.StepID, event.Payload)
  1557. if event.Type == workflow.RunEventStepError || event.Type == workflow.RunEventWorkflowFailed {
  1558. hasError = true
  1559. }
  1560. }
  1561. // Workflow should have failed due to invalid JSON
  1562. if !hasError {
  1563. t.Error("Expected workflow to fail with invalid JSON, but it succeeded")
  1564. }
  1565. if result.Context.Status != workflow.StatusFailed {
  1566. t.Errorf("Expected workflow status to be 'failed', got: %s", result.Context.Status)
  1567. }
  1568. })
  1569. t.Run("ArrayTypeWithNonStringValue", func(t *testing.T) {
  1570. // When value is already an array, it should not require conversion
  1571. wf := &workflow.Workflow{
  1572. Version: "3.6",
  1573. Name: "Array Already Array Test",
  1574. Registry: workflow.Registry{
  1575. Services: []string{},
  1576. Components: []string{},
  1577. Vars: []string{
  1578. "$items([OBJECT])",
  1579. },
  1580. Files: workflow.FilesRegistry{
  1581. Inputs: []string{},
  1582. Artifacts: []string{},
  1583. },
  1584. },
  1585. Steps: []workflow.Step{
  1586. {
  1587. ID: "Set_Items",
  1588. Target: "$items",
  1589. Value: "=$presetItems",
  1590. Next: "Stop_End",
  1591. },
  1592. {
  1593. ID: "Stop_End",
  1594. },
  1595. },
  1596. }
  1597. adapters := createTestAdapters()
  1598. engine, err := workflow.NewEngine(wf)
  1599. if err != nil {
  1600. t.Fatalf("Failed to create engine: %v", err)
  1601. }
  1602. initialVars := map[string]interface{}{
  1603. "$presetItems": []interface{}{
  1604. map[string]interface{}{"id": 1},
  1605. map[string]interface{}{"id": 2},
  1606. },
  1607. }
  1608. result, err := engine.Execute(context.Background(), initialVars, adapters)
  1609. if err != nil {
  1610. t.Fatalf("Failed to execute workflow: %v", err)
  1611. }
  1612. // Consume events
  1613. logEvents(t, result.RunEventStream)
  1614. // Value should remain as-is (already an array)
  1615. items, ok := result.Context.Variables["$items"].([]interface{})
  1616. if !ok {
  1617. t.Fatalf("Expected $items to be []interface{}, got: %T", result.Context.Variables["$items"])
  1618. }
  1619. if len(items) != 2 {
  1620. t.Errorf("Expected 2 items, got %d", len(items))
  1621. }
  1622. })
  1623. }
  1624. // TestFileOutputJSONMarshal tests that non-string values are JSON-marshaled when writing to files
  1625. func TestFileOutputJSONMarshal(t *testing.T) {
  1626. t.Run("WriteObjectToFile", func(t *testing.T) {
  1627. fileAdapter := workflow.NewDefaultFileAdapter()
  1628. serviceAdapter := workflow.NewDefaultServiceAdapter()
  1629. serviceAdapter.RegisterHandler("GenerateData", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) {
  1630. return &workflow.ServiceResult{
  1631. Data: map[string]interface{}{
  1632. "user": map[string]interface{}{
  1633. "name": "Alice",
  1634. "age": 30,
  1635. "email": "alice@example.com",
  1636. },
  1637. "items": []interface{}{
  1638. map[string]interface{}{"id": 1, "name": "item1"},
  1639. map[string]interface{}{"id": 2, "name": "item2"},
  1640. },
  1641. "count": 2,
  1642. },
  1643. }, nil
  1644. })
  1645. adapters := &workflow.Adapters{
  1646. Service: serviceAdapter,
  1647. Component: workflow.NewDefaultComponentAdapter(),
  1648. LLM: workflow.NewDefaultLLMAdapter(),
  1649. File: fileAdapter,
  1650. }
  1651. wf := &workflow.Workflow{
  1652. Version: "3.6",
  1653. Name: "File Output JSON Marshal Test",
  1654. Registry: workflow.Registry{
  1655. Services: []string{
  1656. "GenerateData() RETURN user(OBJECT), items([OBJECT]), count(NUMBER)",
  1657. },
  1658. Components: []string{},
  1659. Vars: []string{},
  1660. Files: workflow.FilesRegistry{
  1661. Inputs: []string{},
  1662. Artifacts: []string{"output/*"},
  1663. },
  1664. },
  1665. Steps: []workflow.Step{
  1666. {
  1667. ID: "Service_GenerateData",
  1668. Out: workflow.StepOutput{
  1669. "/output/user.json": "=_result.user",
  1670. "/output/items.json": "=_result.items",
  1671. "/output/count.json": "=_result.count",
  1672. },
  1673. Next: "Stop_End",
  1674. },
  1675. {
  1676. ID: "Stop_End",
  1677. },
  1678. },
  1679. }
  1680. engine, err := workflow.NewEngine(wf)
  1681. if err != nil {
  1682. t.Fatalf("Failed to create engine: %v", err)
  1683. }
  1684. result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters)
  1685. if err != nil {
  1686. t.Fatalf("Failed to execute workflow: %v", err)
  1687. }
  1688. // Consume events
  1689. logEvents(t, result.RunEventStream)
  1690. // Verify user object was written as JSON
  1691. userContent := fileAdapter.GetFile("/output/user.json")
  1692. var userObj map[string]interface{}
  1693. if err := json.Unmarshal(userContent, &userObj); err != nil {
  1694. t.Fatalf("Failed to parse user.json as JSON: %v. Content: %s", err, string(userContent))
  1695. }
  1696. if userObj["name"] != "Alice" {
  1697. t.Errorf("Expected user.name to be 'Alice', got: %v", userObj["name"])
  1698. }
  1699. if userObj["age"] != float64(30) {
  1700. t.Errorf("Expected user.age to be 30, got: %v", userObj["age"])
  1701. }
  1702. // Verify items array was written as JSON
  1703. itemsContent := fileAdapter.GetFile("/output/items.json")
  1704. var itemsArr []interface{}
  1705. if err := json.Unmarshal(itemsContent, &itemsArr); err != nil {
  1706. t.Fatalf("Failed to parse items.json as JSON: %v. Content: %s", err, string(itemsContent))
  1707. }
  1708. if len(itemsArr) != 2 {
  1709. t.Errorf("Expected 2 items, got: %d", len(itemsArr))
  1710. }
  1711. item1 := itemsArr[0].(map[string]interface{})
  1712. if item1["name"] != "item1" {
  1713. t.Errorf("Expected items[0].name to be 'item1', got: %v", item1["name"])
  1714. }
  1715. // Verify number was written as JSON
  1716. countContent := fileAdapter.GetFile("/output/count.json")
  1717. var countVal float64
  1718. if err := json.Unmarshal(countContent, &countVal); err != nil {
  1719. t.Fatalf("Failed to parse count.json as JSON: %v. Content: %s", err, string(countContent))
  1720. }
  1721. if countVal != 2 {
  1722. t.Errorf("Expected count to be 2, got: %v", countVal)
  1723. }
  1724. })
  1725. t.Run("WriteStepValueToFile", func(t *testing.T) {
  1726. // Test Write_* step with non-string values
  1727. fileAdapter := workflow.NewDefaultFileAdapter()
  1728. adapters := createTestAdaptersWithFile(fileAdapter)
  1729. wf := &workflow.Workflow{
  1730. Version: "3.6",
  1731. Name: "Write Step JSON Marshal Test",
  1732. Registry: workflow.Registry{
  1733. Services: []string{},
  1734. Components: []string{},
  1735. Vars: []string{
  1736. "$data(OBJECT)",
  1737. },
  1738. Files: workflow.FilesRegistry{
  1739. Inputs: []string{},
  1740. Artifacts: []string{"output/*"},
  1741. },
  1742. },
  1743. Steps: []workflow.Step{
  1744. {
  1745. ID: "Set_Data",
  1746. Target: "$data",
  1747. Value: `{"status":"success","code":200}`,
  1748. Next: "Write_JsonFile",
  1749. },
  1750. {
  1751. ID: "Write_JsonFile",
  1752. Target: "output/result.json",
  1753. Value: "=$data",
  1754. Next: "Stop_End",
  1755. },
  1756. {
  1757. ID: "Stop_End",
  1758. },
  1759. },
  1760. }
  1761. engine, err := workflow.NewEngine(wf)
  1762. if err != nil {
  1763. t.Fatalf("Failed to create engine: %v", err)
  1764. }
  1765. result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters)
  1766. if err != nil {
  1767. t.Fatalf("Failed to execute workflow: %v", err)
  1768. }
  1769. // Consume events and check for errors
  1770. var errorMsg string
  1771. for event := range result.RunEventStream {
  1772. t.Logf("Event: %s - %v - %v", event.Type, event.StepID, event.Payload)
  1773. if event.Type == workflow.RunEventStepError || event.Type == workflow.RunEventWorkflowFailed {
  1774. errorMsg = fmt.Sprintf("%v", event.Payload["error"])
  1775. }
  1776. }
  1777. if errorMsg != "" {
  1778. t.Fatalf("Workflow failed with error: %s", errorMsg)
  1779. }
  1780. // Verify object was written as JSON (not Go format)
  1781. content := fileAdapter.GetFile("output/result.json")
  1782. var dataObj map[string]interface{}
  1783. if err := json.Unmarshal(content, &dataObj); err != nil {
  1784. t.Fatalf("Failed to parse result.json as JSON: %v. Content: %s", err, string(content))
  1785. }
  1786. if dataObj["status"] != "success" {
  1787. t.Errorf("Expected status to be 'success', got: %v", dataObj["status"])
  1788. }
  1789. // JSON unmarshals numbers as float64
  1790. if dataObj["code"] != float64(200) {
  1791. t.Errorf("Expected code to be 200, got: %v", dataObj["code"])
  1792. }
  1793. })
  1794. }
  1795. // TestNestedExpressionEvaluation tests that expressions are evaluated in nested structures
  1796. func TestNestedExpressionEvaluation(t *testing.T) {
  1797. t.Run("ExpressionsInMessagesArray", func(t *testing.T) {
  1798. var capturedParams map[string]interface{}
  1799. llmAdapter := workflow.NewDefaultLLMAdapter()
  1800. llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
  1801. capturedParams = params
  1802. return map[string]interface{}{
  1803. "content": "Response about the fruit",
  1804. }, nil
  1805. })
  1806. wf := &workflow.Workflow{
  1807. Version: "3.6",
  1808. Name: "Nested Expression Test",
  1809. Registry: workflow.Registry{
  1810. Services: []string{},
  1811. Components: []string{},
  1812. Vars: []string{
  1813. "$items([OBJECT])",
  1814. "$result(STRING)",
  1815. },
  1816. Files: workflow.FilesRegistry{
  1817. Inputs: []string{},
  1818. Artifacts: []string{},
  1819. },
  1820. },
  1821. Steps: []workflow.Step{
  1822. {
  1823. ID: "Set_Items",
  1824. Target: "$items",
  1825. Value: `[{"name":"apple","color":"red"},{"name":"banana","color":"yellow"}]`,
  1826. Next: "Loop_ProcessItems",
  1827. },
  1828. {
  1829. ID: "Loop_ProcessItems",
  1830. Mode: "serial",
  1831. Source: "=$items",
  1832. Children: []string{"LLM_Describe"},
  1833. Next: "Stop_End",
  1834. },
  1835. {
  1836. ID: "LLM_Describe",
  1837. In: workflow.StepInput{
  1838. "model": "gpt-4",
  1839. "stream": false,
  1840. "messages": []interface{}{
  1841. map[string]interface{}{
  1842. "role": "user",
  1843. "content": "Describe this fruit in one sentence:",
  1844. },
  1845. map[string]interface{}{
  1846. "role": "user",
  1847. "content": "=_item.name",
  1848. },
  1849. },
  1850. },
  1851. Out: workflow.StepOutput{
  1852. "$result": "=_result.content",
  1853. },
  1854. Next: "RETURN",
  1855. },
  1856. {
  1857. ID: "Stop_End",
  1858. },
  1859. },
  1860. }
  1861. engine, err := workflow.NewEngine(wf)
  1862. if err != nil {
  1863. t.Fatalf("Failed to create engine: %v", err)
  1864. }
  1865. adapters := &workflow.Adapters{
  1866. Service: workflow.NewDefaultServiceAdapter(),
  1867. Component: workflow.NewDefaultComponentAdapter(),
  1868. LLM: llmAdapter,
  1869. File: workflow.NewDefaultFileAdapter(),
  1870. }
  1871. result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters)
  1872. if err != nil {
  1873. t.Fatalf("Failed to execute workflow: %v", err)
  1874. }
  1875. // Consume events
  1876. logEvents(t, result.RunEventStream)
  1877. // Verify that the expression in the messages array was evaluated
  1878. messages, ok := capturedParams["messages"].([]interface{})
  1879. if !ok {
  1880. t.Fatalf("Expected messages to be []interface{}, got: %T", capturedParams["messages"])
  1881. }
  1882. if len(messages) != 2 {
  1883. t.Fatalf("Expected 2 messages, got: %d", len(messages))
  1884. }
  1885. // Check second message - should have "banana" (last iteration)
  1886. secondMsg, ok := messages[1].(map[string]interface{})
  1887. if !ok {
  1888. t.Fatalf("Expected messages[1] to be map, got: %T", messages[1])
  1889. }
  1890. content, ok := secondMsg["content"].(string)
  1891. if !ok {
  1892. t.Fatalf("Expected content to be string, got: %T", secondMsg["content"])
  1893. }
  1894. // The last item processed should be "banana"
  1895. if content != "banana" {
  1896. t.Errorf("Expected message content to be 'banana' (from _item.name), got: %v", content)
  1897. }
  1898. })
  1899. t.Run("ExpressionsInNestedObjects", func(t *testing.T) {
  1900. var capturedParams map[string]interface{}
  1901. serviceAdapter := workflow.NewDefaultServiceAdapter()
  1902. serviceAdapter.RegisterHandler("TestService", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) {
  1903. capturedParams = params
  1904. return &workflow.ServiceResult{
  1905. Data: map[string]interface{}{
  1906. "success": true,
  1907. },
  1908. }, nil
  1909. })
  1910. wf := &workflow.Workflow{
  1911. Version: "3.6",
  1912. Name: "Nested Object Expression Test",
  1913. Registry: workflow.Registry{
  1914. Services: []string{
  1915. "TestService(config(OBJECT)) RETURN success(BOOL)",
  1916. },
  1917. Components: []string{},
  1918. Vars: []string{
  1919. "$userId(STRING)",
  1920. "$userName(STRING)",
  1921. },
  1922. Files: workflow.FilesRegistry{
  1923. Inputs: []string{},
  1924. Artifacts: []string{},
  1925. },
  1926. },
  1927. Steps: []workflow.Step{
  1928. {
  1929. ID: "Set_UserId",
  1930. Target: "$userId",
  1931. Value: "user123",
  1932. Next: "Set_UserName",
  1933. },
  1934. {
  1935. ID: "Set_UserName",
  1936. Target: "$userName",
  1937. Value: "Alice",
  1938. Next: "Service_TestService",
  1939. },
  1940. {
  1941. ID: "Service_TestService",
  1942. In: workflow.StepInput{
  1943. "config": map[string]interface{}{
  1944. "user": map[string]interface{}{
  1945. "id": "=$userId",
  1946. "name": "=$userName",
  1947. },
  1948. "settings": map[string]interface{}{
  1949. "theme": "dark",
  1950. "language": "en",
  1951. },
  1952. },
  1953. },
  1954. Next: "Stop_End",
  1955. },
  1956. {
  1957. ID: "Stop_End",
  1958. },
  1959. },
  1960. }
  1961. engine, err := workflow.NewEngine(wf)
  1962. if err != nil {
  1963. t.Fatalf("Failed to create engine: %v", err)
  1964. }
  1965. adapters := &workflow.Adapters{
  1966. Service: serviceAdapter,
  1967. Component: workflow.NewDefaultComponentAdapter(),
  1968. LLM: workflow.NewDefaultLLMAdapter(),
  1969. File: workflow.NewDefaultFileAdapter(),
  1970. }
  1971. result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters)
  1972. if err != nil {
  1973. t.Fatalf("Failed to execute workflow: %v", err)
  1974. }
  1975. // Consume events
  1976. logEvents(t, result.RunEventStream)
  1977. // Verify that nested expressions were evaluated
  1978. config, ok := capturedParams["config"].(map[string]interface{})
  1979. if !ok {
  1980. t.Fatalf("Expected config to be map, got: %T", capturedParams["config"])
  1981. }
  1982. user, ok := config["user"].(map[string]interface{})
  1983. if !ok {
  1984. t.Fatalf("Expected config.user to be map, got: %T", config["user"])
  1985. }
  1986. // Verify expressions were evaluated
  1987. if user["id"] != "user123" {
  1988. t.Errorf("Expected user.id to be 'user123', got: %v", user["id"])
  1989. }
  1990. if user["name"] != "Alice" {
  1991. t.Errorf("Expected user.name to be 'Alice', got: %v", user["name"])
  1992. }
  1993. // Verify non-expressions are preserved
  1994. settings, ok := config["settings"].(map[string]interface{})
  1995. if !ok {
  1996. t.Fatalf("Expected config.settings to be map, got: %T", config["settings"])
  1997. }
  1998. if settings["theme"] != "dark" {
  1999. t.Errorf("Expected settings.theme to be 'dark', got: %v", settings["theme"])
  2000. }
  2001. })
  2002. }
  2003. // TestLLMMessagesWithExpressions tests the exact use case from the user's example
  2004. func TestLLMMessagesWithExpressions(t *testing.T) {
  2005. var capturedParams map[string]interface{}
  2006. llmAdapter := workflow.NewDefaultLLMAdapter()
  2007. llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
  2008. capturedParams = params
  2009. // Verify the messages structure
  2010. messages, ok := params["messages"].([]interface{})
  2011. if !ok || len(messages) != 2 {
  2012. return nil, fmt.Errorf("unexpected messages format")
  2013. }
  2014. // Get the fruit name from the second message
  2015. msg2 := messages[1].(map[string]interface{})
  2016. fruitName := msg2["content"].(string)
  2017. return map[string]interface{}{
  2018. "content": fmt.Sprintf("The %s is a delicious fruit.", fruitName),
  2019. }, nil
  2020. })
  2021. wf := &workflow.Workflow{
  2022. Version: "3.6",
  2023. Name: "LLM Messages Expression Test",
  2024. Registry: workflow.Registry{
  2025. Services: []string{},
  2026. Components: []string{},
  2027. Vars: []string{
  2028. "$fruits([OBJECT])",
  2029. "$descriptions([STRING])",
  2030. },
  2031. Files: workflow.FilesRegistry{
  2032. Inputs: []string{},
  2033. Artifacts: []string{},
  2034. },
  2035. },
  2036. Steps: []workflow.Step{
  2037. {
  2038. ID: "Set_Fruits",
  2039. Target: "$fruits",
  2040. Value: `[{"name":"apple"},{"name":"banana"},{"name":"orange"}]`,
  2041. Next: "Loop_Describe",
  2042. },
  2043. {
  2044. ID: "Loop_Describe",
  2045. Mode: "serial",
  2046. Source: "=$fruits",
  2047. Children: []string{"LLM_DescribeFruit"},
  2048. Next: "Stop_End",
  2049. },
  2050. {
  2051. ID: "LLM_DescribeFruit",
  2052. In: workflow.StepInput{
  2053. "model": "gpt-4",
  2054. "stream": true,
  2055. "messages": []interface{}{
  2056. map[string]interface{}{
  2057. "role": "user",
  2058. "content": "Describe this fruit in one sentence:",
  2059. },
  2060. map[string]interface{}{
  2061. "role": "user",
  2062. "content": "=_item.name",
  2063. },
  2064. },
  2065. },
  2066. Out: workflow.StepOutput{
  2067. "$descriptions[_index]": "=_result.content",
  2068. },
  2069. Next: "RETURN",
  2070. },
  2071. {
  2072. ID: "Stop_End",
  2073. },
  2074. },
  2075. }
  2076. engine, err := workflow.NewEngine(wf)
  2077. if err != nil {
  2078. t.Fatalf("Failed to create engine: %v", err)
  2079. }
  2080. adapters := &workflow.Adapters{
  2081. Service: workflow.NewDefaultServiceAdapter(),
  2082. Component: workflow.NewDefaultComponentAdapter(),
  2083. LLM: llmAdapter,
  2084. File: workflow.NewDefaultFileAdapter(),
  2085. }
  2086. result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters)
  2087. if err != nil {
  2088. t.Fatalf("Failed to execute workflow: %v", err)
  2089. }
  2090. // Consume events
  2091. logEvents(t, result.RunEventStream)
  2092. // Verify messages array had expressions evaluated
  2093. messages, ok := capturedParams["messages"].([]interface{})
  2094. if !ok {
  2095. t.Fatalf("Expected messages to be []interface{}, got: %T", capturedParams["messages"])
  2096. }
  2097. if len(messages) != 2 {
  2098. t.Fatalf("Expected 2 messages, got: %d", len(messages))
  2099. }
  2100. // Verify first message (static content)
  2101. msg1 := messages[0].(map[string]interface{})
  2102. if msg1["content"] != "Describe this fruit in one sentence:" {
  2103. t.Errorf("Expected first message content to be 'Describe this fruit in one sentence:', got: %v", msg1["content"])
  2104. }
  2105. // Verify second message (evaluated expression)
  2106. msg2 := messages[1].(map[string]interface{})
  2107. content := msg2["content"].(string)
  2108. // Should be "orange" (last iteration in the loop)
  2109. if content != "orange" {
  2110. t.Errorf("Expected second message content to be 'orange' (from =_item.name), got: %v", content)
  2111. }
  2112. // Verify descriptions were collected
  2113. descriptions, ok := result.Context.Variables["$descriptions"].([]interface{})
  2114. if !ok {
  2115. t.Fatalf("Expected $descriptions to be []interface{}, got: %T", result.Context.Variables["$descriptions"])
  2116. }
  2117. if len(descriptions) != 3 {
  2118. t.Errorf("Expected 3 descriptions, got: %d", len(descriptions))
  2119. }
  2120. // Verify each description
  2121. expectedDescriptions := []string{
  2122. "The apple is a delicious fruit.",
  2123. "The banana is a delicious fruit.",
  2124. "The orange is a delicious fruit.",
  2125. }
  2126. for i, expected := range expectedDescriptions {
  2127. if descriptions[i] != expected {
  2128. t.Errorf("Expected descriptions[%d] to be '%s', got: %v", i, expected, descriptions[i])
  2129. }
  2130. }
  2131. }
  2132. // TestLoopSourceExpression tests that Loop source supports both old and new expression formats
  2133. func TestLoopSourceExpression(t *testing.T) {
  2134. t.Run("NewFormatWithEqualsPrefix", func(t *testing.T) {
  2135. wf := &workflow.Workflow{
  2136. Version: "3.6",
  2137. Name: "Loop with = prefix",
  2138. Registry: workflow.Registry{
  2139. Services: []string{},
  2140. Components: []string{},
  2141. Vars: []string{
  2142. "$numbers([NUMBER])",
  2143. "$doubled([NUMBER])",
  2144. },
  2145. Files: workflow.FilesRegistry{
  2146. Inputs: []string{},
  2147. Artifacts: []string{},
  2148. },
  2149. },
  2150. Steps: []workflow.Step{
  2151. {
  2152. ID: "Loop_Double",
  2153. Mode: "serial",
  2154. Source: "=$numbers", // New format with = prefix
  2155. Children: []string{"Set_Doubled"},
  2156. Next: "Stop_End",
  2157. },
  2158. {
  2159. ID: "Set_Doubled",
  2160. Target: "$doubled[_index]",
  2161. Value: "=_item * 2",
  2162. Next: "RETURN",
  2163. },
  2164. {
  2165. ID: "Stop_End",
  2166. },
  2167. },
  2168. }
  2169. adapters := createTestAdapters()
  2170. engine, err := workflow.NewEngine(wf)
  2171. if err != nil {
  2172. t.Fatalf("Failed to create engine: %v", err)
  2173. }
  2174. initialVars := map[string]interface{}{
  2175. "$numbers": []interface{}{int64(1), int64(2), int64(3)},
  2176. "$doubled": []interface{}{},
  2177. }
  2178. result, err := engine.Execute(context.Background(), initialVars, adapters)
  2179. if err != nil {
  2180. t.Fatalf("Failed to execute workflow: %v", err)
  2181. }
  2182. logEvents(t, result.RunEventStream)
  2183. doubled, ok := result.Context.Variables["$doubled"].([]interface{})
  2184. if !ok {
  2185. t.Fatalf("Expected $doubled to be []interface{}")
  2186. }
  2187. expected := []int64{2, 4, 6}
  2188. for i, exp := range expected {
  2189. actual := doubled[i].(float64) // Arithmetic results are float64
  2190. if actual != float64(exp) {
  2191. t.Errorf("doubled[%d]: expected %d, got %v", i, exp, actual)
  2192. }
  2193. }
  2194. })
  2195. t.Run("OldFormatBackwardCompatibility", func(t *testing.T) {
  2196. // Test that old format ($items without =) still works
  2197. wf := &workflow.Workflow{
  2198. Version: "3.6",
  2199. Name: "Loop backward compatibility",
  2200. Registry: workflow.Registry{
  2201. Services: []string{},
  2202. Components: []string{},
  2203. Vars: []string{
  2204. "$items([STRING])",
  2205. "$output([STRING])",
  2206. },
  2207. Files: workflow.FilesRegistry{
  2208. Inputs: []string{},
  2209. Artifacts: []string{},
  2210. },
  2211. },
  2212. Steps: []workflow.Step{
  2213. {
  2214. ID: "Loop_Process",
  2215. Mode: "serial",
  2216. Source: "$items", // Old format without = prefix (for backward compatibility testing)
  2217. Children: []string{"Set_Output"},
  2218. Next: "Stop_End",
  2219. },
  2220. {
  2221. ID: "Set_Output",
  2222. Target: "$output[_index]",
  2223. Value: "=_item",
  2224. Next: "RETURN",
  2225. },
  2226. {
  2227. ID: "Stop_End",
  2228. },
  2229. },
  2230. }
  2231. adapters := createTestAdapters()
  2232. engine, err := workflow.NewEngine(wf)
  2233. if err != nil {
  2234. t.Fatalf("Failed to create engine: %v", err)
  2235. }
  2236. initialVars := map[string]interface{}{
  2237. "$items": []interface{}{"a", "b", "c"},
  2238. "$output": []interface{}{},
  2239. }
  2240. result, err := engine.Execute(context.Background(), initialVars, adapters)
  2241. if err != nil {
  2242. t.Fatalf("Failed to execute workflow: %v", err)
  2243. }
  2244. logEvents(t, result.RunEventStream)
  2245. output, ok := result.Context.Variables["$output"].([]interface{})
  2246. if !ok {
  2247. t.Fatalf("Expected $output to be []interface{}")
  2248. }
  2249. expected := []string{"a", "b", "c"}
  2250. for i, exp := range expected {
  2251. if output[i] != exp {
  2252. t.Errorf("output[%d]: expected %s, got %v", i, exp, output[i])
  2253. }
  2254. }
  2255. })
  2256. }
  2257. // TestDocInjection tests that docs are resolved and injected into LLM system prompts
  2258. func TestDocInjection(t *testing.T) {
  2259. t.Run("DocsAppendedToSystemMessage", func(t *testing.T) {
  2260. var capturedParams map[string]interface{}
  2261. llmAdapter := workflow.NewDefaultLLMAdapter()
  2262. llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
  2263. capturedParams = params
  2264. return map[string]interface{}{
  2265. "content": "LLM response",
  2266. }, nil
  2267. })
  2268. docAdapter := workflow.NewDefaultDocAdapter()
  2269. docAdapter.SetDoc("11", "VL syntax rules content here")
  2270. docAdapter.SetDoc("20", "Component generation spec content here")
  2271. wf := &workflow.Workflow{
  2272. Version: "3.6",
  2273. Name: "Doc Injection Test",
  2274. Registry: workflow.Registry{
  2275. Services: []string{},
  2276. Components: []string{},
  2277. Vars: []string{"$output(STRING)"},
  2278. Files: workflow.FilesRegistry{Inputs: []string{}, Artifacts: []string{}},
  2279. Docs: map[string]string{
  2280. "11": "VL syntax rules",
  2281. "20": "Component generation spec",
  2282. },
  2283. },
  2284. Steps: []workflow.Step{
  2285. {
  2286. ID: "LLM_Generate",
  2287. In: workflow.StepInput{
  2288. "model": "claude-3-opus",
  2289. "docs": []interface{}{"11", "20"},
  2290. "messages": []interface{}{
  2291. map[string]interface{}{"role": "system", "content": "Follow the rules."},
  2292. map[string]interface{}{"role": "user", "content": "Generate something"},
  2293. },
  2294. },
  2295. Out: workflow.StepOutput{"$output": "=_result.content"},
  2296. Next: "Stop_End",
  2297. },
  2298. {ID: "Stop_End"},
  2299. },
  2300. }
  2301. engine, err := workflow.NewEngine(wf)
  2302. if err != nil {
  2303. t.Fatalf("Failed to create engine: %v", err)
  2304. }
  2305. adapters := &workflow.Adapters{
  2306. Service: workflow.NewDefaultServiceAdapter(),
  2307. Component: workflow.NewDefaultComponentAdapter(),
  2308. LLM: llmAdapter,
  2309. File: workflow.NewDefaultFileAdapter(),
  2310. Doc: docAdapter,
  2311. }
  2312. result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters)
  2313. if err != nil {
  2314. t.Fatalf("Failed to execute workflow: %v", err)
  2315. }
  2316. logEvents(t, result.RunEventStream)
  2317. // Verify docs were injected into system message
  2318. messages, ok := capturedParams["messages"].([]interface{})
  2319. if !ok {
  2320. t.Fatal("Expected messages in captured params")
  2321. }
  2322. sysMsg, ok := messages[0].(map[string]interface{})
  2323. if !ok {
  2324. t.Fatal("Expected first message to be a map")
  2325. }
  2326. content, _ := sysMsg["content"].(string)
  2327. if !strings.Contains(content, "Follow the rules.") {
  2328. t.Error("Expected original system content to be preserved")
  2329. }
  2330. if !strings.Contains(content, "VL syntax rules content here") {
  2331. t.Error("Expected doc 11 content to be injected")
  2332. }
  2333. if !strings.Contains(content, "Component generation spec content here") {
  2334. t.Error("Expected doc 20 content to be injected")
  2335. }
  2336. if !strings.Contains(content, "[Doc 11: VL syntax rules]") {
  2337. t.Error("Expected doc header with ID and description")
  2338. }
  2339. // Verify docs param was removed (not passed to LLM)
  2340. if _, hasDocs := capturedParams["docs"]; hasDocs {
  2341. t.Error("Expected docs param to be removed before passing to LLM adapter")
  2342. }
  2343. // Verify output
  2344. if result.Context.Variables["$output"] != "LLM response" {
  2345. t.Errorf("Expected $output='LLM response', got %v", result.Context.Variables["$output"])
  2346. }
  2347. })
  2348. t.Run("DocsWithNoSystemMessage", func(t *testing.T) {
  2349. var capturedParams map[string]interface{}
  2350. llmAdapter := workflow.NewDefaultLLMAdapter()
  2351. llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
  2352. capturedParams = params
  2353. return map[string]interface{}{"content": "ok"}, nil
  2354. })
  2355. docAdapter := workflow.NewDefaultDocAdapter()
  2356. docAdapter.SetDoc("5", "Some doc content")
  2357. wf := &workflow.Workflow{
  2358. Version: "3.6",
  2359. Name: "Doc No System Msg Test",
  2360. Registry: workflow.Registry{
  2361. Services: []string{},
  2362. Components: []string{},
  2363. Vars: []string{},
  2364. Files: workflow.FilesRegistry{Inputs: []string{}, Artifacts: []string{}},
  2365. Docs: map[string]string{"5": "Some reference doc"},
  2366. },
  2367. Steps: []workflow.Step{
  2368. {
  2369. ID: "LLM_Generate",
  2370. In: workflow.StepInput{
  2371. "model": "gpt-4",
  2372. "docs": []interface{}{"5"},
  2373. "messages": []interface{}{
  2374. map[string]interface{}{"role": "user", "content": "Hello"},
  2375. },
  2376. },
  2377. Next: "Stop_End",
  2378. },
  2379. {ID: "Stop_End"},
  2380. },
  2381. }
  2382. engine, err := workflow.NewEngine(wf)
  2383. if err != nil {
  2384. t.Fatalf("Failed to create engine: %v", err)
  2385. }
  2386. adapters := &workflow.Adapters{
  2387. Service: workflow.NewDefaultServiceAdapter(),
  2388. Component: workflow.NewDefaultComponentAdapter(),
  2389. LLM: llmAdapter,
  2390. File: workflow.NewDefaultFileAdapter(),
  2391. Doc: docAdapter,
  2392. }
  2393. result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters)
  2394. if err != nil {
  2395. t.Fatalf("Failed to execute: %v", err)
  2396. }
  2397. logEvents(t, result.RunEventStream)
  2398. // A system message should have been prepended
  2399. messages := capturedParams["messages"].([]interface{})
  2400. if len(messages) != 2 {
  2401. t.Fatalf("Expected 2 messages (prepended system + user), got %d", len(messages))
  2402. }
  2403. sysMsg := messages[0].(map[string]interface{})
  2404. if sysMsg["role"] != "system" {
  2405. t.Errorf("Expected first message role='system', got %v", sysMsg["role"])
  2406. }
  2407. content := sysMsg["content"].(string)
  2408. if !strings.Contains(content, "Some doc content") {
  2409. t.Errorf("Expected doc content in prepended system message, got: %s", content)
  2410. }
  2411. })
  2412. t.Run("NoDocs", func(t *testing.T) {
  2413. var capturedParams map[string]interface{}
  2414. llmAdapter := workflow.NewDefaultLLMAdapter()
  2415. llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
  2416. capturedParams = params
  2417. return map[string]interface{}{"content": "ok"}, nil
  2418. })
  2419. wf := &workflow.Workflow{
  2420. Version: "3.6",
  2421. Name: "No Docs Test",
  2422. Registry: workflow.Registry{
  2423. Services: []string{},
  2424. Components: []string{},
  2425. Vars: []string{},
  2426. Files: workflow.FilesRegistry{Inputs: []string{}, Artifacts: []string{}},
  2427. },
  2428. Steps: []workflow.Step{
  2429. {
  2430. ID: "LLM_Generate",
  2431. In: workflow.StepInput{
  2432. "model": "gpt-4",
  2433. "messages": []interface{}{
  2434. map[string]interface{}{"role": "system", "content": "Be helpful."},
  2435. map[string]interface{}{"role": "user", "content": "Hi"},
  2436. },
  2437. },
  2438. Next: "Stop_End",
  2439. },
  2440. {ID: "Stop_End"},
  2441. },
  2442. }
  2443. engine, err := workflow.NewEngine(wf)
  2444. if err != nil {
  2445. t.Fatalf("Failed to create engine: %v", err)
  2446. }
  2447. adapters := &workflow.Adapters{
  2448. Service: workflow.NewDefaultServiceAdapter(),
  2449. Component: workflow.NewDefaultComponentAdapter(),
  2450. LLM: llmAdapter,
  2451. File: workflow.NewDefaultFileAdapter(),
  2452. }
  2453. result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters)
  2454. if err != nil {
  2455. t.Fatalf("Failed to execute: %v", err)
  2456. }
  2457. logEvents(t, result.RunEventStream)
  2458. // System message should be untouched
  2459. messages := capturedParams["messages"].([]interface{})
  2460. sysMsg := messages[0].(map[string]interface{})
  2461. if sysMsg["content"] != "Be helpful." {
  2462. t.Errorf("Expected system message unchanged, got: %v", sysMsg["content"])
  2463. }
  2464. })
  2465. }
  2466. func TestEqualsPrefixBranchAndIf(t *testing.T) {
  2467. t.Run("BranchCaseWithEqualsPrefix", func(t *testing.T) {
  2468. wf := &workflow.Workflow{
  2469. Version: "3.6",
  2470. Name: "TestBranchEqualsPrefix",
  2471. Registry: workflow.Registry{
  2472. Vars: []string{"$status(STRING)", "$result(STRING)"},
  2473. },
  2474. Steps: []workflow.Step{
  2475. {
  2476. ID: "Set_Status",
  2477. Target: "$status",
  2478. Value: "active",
  2479. Next: "Branch_Check",
  2480. },
  2481. {
  2482. ID: "Branch_Check",
  2483. Cases: [][]string{
  2484. {"=$status == \"active\"", "Set_Active"},
  2485. {"ELSE", "Set_Other"},
  2486. },
  2487. Next: "Stop_End",
  2488. },
  2489. {
  2490. ID: "Set_Active",
  2491. Target: "$result",
  2492. Value: "was_active",
  2493. Next: "Stop_End",
  2494. },
  2495. {
  2496. ID: "Set_Other",
  2497. Target: "$result",
  2498. Value: "was_other",
  2499. Next: "Stop_End",
  2500. },
  2501. {ID: "Stop_End"},
  2502. },
  2503. }
  2504. engine, err := workflow.NewEngine(wf)
  2505. if err != nil {
  2506. t.Fatalf("failed to create engine: %v", err)
  2507. }
  2508. res, err := engine.Execute(context.Background(), map[string]interface{}{}, createTestAdapters())
  2509. if err != nil {
  2510. t.Fatalf("failed to execute: %v", err)
  2511. }
  2512. drainEvents(res.RunEventStream)
  2513. if res.Context.Variables["$result"] != "was_active" {
  2514. t.Errorf("expected 'was_active', got %v", res.Context.Variables["$result"])
  2515. }
  2516. })
  2517. t.Run("BranchCaseBackwardCompatWithoutEqualsPrefix", func(t *testing.T) {
  2518. // Ensure old-style cases (no = prefix) still work
  2519. wf := &workflow.Workflow{
  2520. Version: "3.6",
  2521. Name: "TestBranchBackwardCompat",
  2522. Registry: workflow.Registry{
  2523. Vars: []string{"$score(INT)", "$result(STRING)"},
  2524. },
  2525. Steps: []workflow.Step{
  2526. {
  2527. ID: "Set_Score",
  2528. Target: "$score",
  2529. Value: "=90",
  2530. Next: "Branch_Check",
  2531. },
  2532. {
  2533. ID: "Branch_Check",
  2534. Cases: [][]string{
  2535. {"$score >= 90", "Set_High"},
  2536. {"ELSE", "Set_Low"},
  2537. },
  2538. Next: "Stop_End",
  2539. },
  2540. {
  2541. ID: "Set_High",
  2542. Target: "$result",
  2543. Value: "high",
  2544. Next: "Stop_End",
  2545. },
  2546. {
  2547. ID: "Set_Low",
  2548. Target: "$result",
  2549. Value: "low",
  2550. Next: "Stop_End",
  2551. },
  2552. {ID: "Stop_End"},
  2553. },
  2554. }
  2555. engine, err := workflow.NewEngine(wf)
  2556. if err != nil {
  2557. t.Fatalf("failed to create engine: %v", err)
  2558. }
  2559. res, err := engine.Execute(context.Background(), map[string]interface{}{}, createTestAdapters())
  2560. if err != nil {
  2561. t.Fatalf("failed to execute: %v", err)
  2562. }
  2563. drainEvents(res.RunEventStream)
  2564. if res.Context.Variables["$result"] != "high" {
  2565. t.Errorf("expected 'high', got %v", res.Context.Variables["$result"])
  2566. }
  2567. })
  2568. t.Run("StepIfWithEqualsPrefix", func(t *testing.T) {
  2569. wf := &workflow.Workflow{
  2570. Version: "3.6",
  2571. Name: "TestStepIfEqualsPrefix",
  2572. Registry: workflow.Registry{
  2573. Vars: []string{"$flag(STRING)", "$result(STRING)"},
  2574. },
  2575. Steps: []workflow.Step{
  2576. {
  2577. ID: "Set_Flag",
  2578. Target: "$flag",
  2579. Value: "yes",
  2580. Next: "Set_Conditional",
  2581. },
  2582. {
  2583. ID: "Set_Conditional",
  2584. If: "=$flag == \"yes\"",
  2585. Target: "$result",
  2586. Value: "ran",
  2587. Next: "Stop_End",
  2588. },
  2589. {ID: "Stop_End"},
  2590. },
  2591. }
  2592. engine, err := workflow.NewEngine(wf)
  2593. if err != nil {
  2594. t.Fatalf("failed to create engine: %v", err)
  2595. }
  2596. res, err := engine.Execute(context.Background(), map[string]interface{}{}, createTestAdapters())
  2597. if err != nil {
  2598. t.Fatalf("failed to execute: %v", err)
  2599. }
  2600. drainEvents(res.RunEventStream)
  2601. if res.Context.Variables["$result"] != "ran" {
  2602. t.Errorf("expected 'ran', got %v", res.Context.Variables["$result"])
  2603. }
  2604. })
  2605. t.Run("StepIfWithEqualsPrefixSkipsStep", func(t *testing.T) {
  2606. wf := &workflow.Workflow{
  2607. Version: "3.6",
  2608. Name: "TestStepIfEqualsPrefixSkip",
  2609. Registry: workflow.Registry{
  2610. Vars: []string{"$flag(STRING)", "$result(STRING)"},
  2611. },
  2612. Steps: []workflow.Step{
  2613. {
  2614. ID: "Set_Flag",
  2615. Target: "$flag",
  2616. Value: "no",
  2617. Next: "Set_Conditional",
  2618. },
  2619. {
  2620. ID: "Set_Conditional",
  2621. If: "=$flag == \"yes\"",
  2622. Target: "$result",
  2623. Value: "ran",
  2624. Next: "Stop_End",
  2625. },
  2626. {ID: "Stop_End"},
  2627. },
  2628. }
  2629. engine, err := workflow.NewEngine(wf)
  2630. if err != nil {
  2631. t.Fatalf("failed to create engine: %v", err)
  2632. }
  2633. res, err := engine.Execute(context.Background(), map[string]interface{}{}, createTestAdapters())
  2634. if err != nil {
  2635. t.Fatalf("failed to execute: %v", err)
  2636. }
  2637. drainEvents(res.RunEventStream)
  2638. if res.Context.Variables["$result"] != nil {
  2639. t.Errorf("expected $result to be unset, got %v", res.Context.Variables["$result"])
  2640. }
  2641. })
  2642. t.Run("StepIfBackwardCompatWithoutEqualsPrefix", func(t *testing.T) {
  2643. wf := &workflow.Workflow{
  2644. Version: "3.6",
  2645. Name: "TestStepIfBackwardCompat",
  2646. Registry: workflow.Registry{
  2647. Vars: []string{"$count(INT)", "$result(STRING)"},
  2648. },
  2649. Steps: []workflow.Step{
  2650. {
  2651. ID: "Set_Count",
  2652. Target: "$count",
  2653. Value: "=5",
  2654. Next: "Set_Conditional",
  2655. },
  2656. {
  2657. ID: "Set_Conditional",
  2658. If: "$count > 3",
  2659. Target: "$result",
  2660. Value: "above_threshold",
  2661. Next: "Stop_End",
  2662. },
  2663. {ID: "Stop_End"},
  2664. },
  2665. }
  2666. engine, err := workflow.NewEngine(wf)
  2667. if err != nil {
  2668. t.Fatalf("failed to create engine: %v", err)
  2669. }
  2670. res, err := engine.Execute(context.Background(), map[string]interface{}{}, createTestAdapters())
  2671. if err != nil {
  2672. t.Fatalf("failed to execute: %v", err)
  2673. }
  2674. drainEvents(res.RunEventStream)
  2675. if res.Context.Variables["$result"] != "above_threshold" {
  2676. t.Errorf("expected 'above_threshold', got %v", res.Context.Variables["$result"])
  2677. }
  2678. })
  2679. }