package workflow_test import ( "context" "encoding/json" "fmt" "strings" "sync" "testing" "workflow" ) // Test helper functions // createTestAdapters creates a standard set of adapters for testing func createTestAdapters() *workflow.Adapters { return &workflow.Adapters{ Service: workflow.NewDefaultServiceAdapter(), Component: workflow.NewDefaultComponentAdapter(), LLM: workflow.NewDefaultLLMAdapter(), File: workflow.NewDefaultFileAdapter(), } } // createTestAdaptersWithFile creates adapters with a specific file adapter func createTestAdaptersWithFile(fileAdapter workflow.FileAdapter) *workflow.Adapters { return &workflow.Adapters{ Service: workflow.NewDefaultServiceAdapter(), Component: workflow.NewDefaultComponentAdapter(), LLM: workflow.NewDefaultLLMAdapter(), File: fileAdapter, } } // drainEvents consumes all run events from the stream func drainEvents(stream <-chan workflow.RunEvent) { for range stream { } } // logEvents logs all run events from the stream func logEvents(t *testing.T, stream <-chan workflow.RunEvent) { for event := range stream { stepID := "" if event.StepID != nil { stepID = *event.StepID } t.Logf("RunEvent: %s - %s", event.Type, stepID) } } // createFileOpsHandler creates a file operations component handler func createFileOpsHandler(fileAdapter workflow.FileAdapter) func(context.Context, map[string]interface{}) (map[string]interface{}, error) { return func(ctx context.Context, params map[string]interface{}) (map[string]interface{}, error) { operation, ok := params["operation"].(string) if !ok { return nil, fmt.Errorf("operation parameter is required") } path, ok := params["path"].(string) if !ok { return nil, fmt.Errorf("path parameter is required") } switch operation { case "read": content, err := fileAdapter.Read(ctx, path) if err != nil { return nil, err } return map[string]interface{}{ "content": string(content), }, nil case "write": contentVal, ok := params["content"] if !ok { return nil, fmt.Errorf("content parameter is required for write operation") } // Optimized string conversion var content []byte if str, ok := contentVal.(string); ok { content = []byte(str) } else { content = []byte(fmt.Sprint(contentVal)) } err := fileAdapter.Write(ctx, path, content, workflow.WriteModeOverwrite) if err != nil { return nil, err } return map[string]interface{}{ "success": true, }, nil default: return nil, fmt.Errorf("unsupported operation: %s", operation) } } } // logFinalVariables logs the final variable state for debugging func logFinalVariables(t *testing.T, vars map[string]interface{}, keys ...string) { values := make([]interface{}, len(keys)) for i, key := range keys { values[i] = vars[key] } t.Logf("Final variables: %v", values) } // ExampleWorkflow demonstrates a simple workflow with service calls func ExampleWorkflow() { // Define a workflow wf := &workflow.Workflow{ Version: "3.6", Name: "Simple Task Workflow", Registry: workflow.Registry{ Services: []string{ "PlannerService(prd(STRING)) RETURN plan(OBJECT)", "ValidateService(plan(OBJECT)) RETURN isValid(BOOL), errors([STRING])", }, Components: []string{"FileOps"}, Vars: []string{ "$prdText(STRING)", "$plan(OBJECT)", "$isValid(BOOL)", }, Files: workflow.FilesRegistry{ Inputs: []string{"Process/PRD.json"}, Artifacts: []string{"Process/Artifacts/*"}, }, }, Steps: []workflow.Step{ { ID: "Service_PlannerService", In: workflow.StepInput{ "prd": "=$prdText", }, Out: workflow.StepOutput{ "$plan": "=_result.plan", }, Next: "Service_ValidateService", }, { ID: "Service_ValidateService", In: workflow.StepInput{ "plan": "=$plan", }, Out: workflow.StepOutput{ "$isValid": "=_result.isValid", }, Next: "Branch_CheckValid", }, { ID: "Branch_CheckValid", Cases: [][]string{ {"$isValid == true", "Write_Success"}, {"ELSE", "Write_Error"}, }, Next: "Stop_End", }, { ID: "Write_Success", Target: "Process/Artifacts/result.txt", Value: "Validation passed", Next: "Stop_End", }, { ID: "Write_Error", Target: "Process/Artifacts/error.txt", Value: "Validation failed", Next: "Stop_End", }, { ID: "Stop_End", }, }, } // Create adapters serviceAdapter := workflow.NewDefaultServiceAdapter() serviceAdapter.RegisterHandler("PlannerService", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) { return &workflow.ServiceResult{ Data: map[string]interface{}{ "plan": map[string]interface{}{ "tasks": []string{"task1", "task2"}, "owner": "agent", }, }, }, nil }) serviceAdapter.RegisterHandler("ValidateService", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) { return &workflow.ServiceResult{ Data: map[string]interface{}{ "isValid": true, "errors": []string{}, }, }, nil }) adapters := &workflow.Adapters{ Service: serviceAdapter, Component: workflow.NewDefaultComponentAdapter(), LLM: workflow.NewDefaultLLMAdapter(), File: workflow.NewDefaultFileAdapter(), } // Create engine engine, err := workflow.NewEngine(wf) if err != nil { fmt.Printf("Failed to create engine: %v\n", err) return } // Execute workflow initialVars := map[string]interface{}{ "$prdText": "Build a user authentication system", } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { fmt.Printf("Failed to execute workflow: %v\n", err) return } // Consume events for range result.RunEventStream { } fmt.Println("Workflow completed") // Output: Workflow completed } // TestLoopWorkflow demonstrates a loop workflow func TestLoopWorkflow(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "Loop Example", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{ "$items([OBJECT])", "$results([STRING])", }, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{"Process/Artifacts/*"}, }, }, Steps: []workflow.Step{ { ID: "Loop_ProcessItems", Mode: "serial", Source: "=$items", // Using new = prefix format Children: []string{"Set_ProcessOne"}, Next: "Stop_End", }, { ID: "Set_ProcessOne", Target: "$results[_index]", Value: "=_item.name", Next: "RETURN", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } initialVars := map[string]interface{}{ "$items": []interface{}{ map[string]interface{}{"name": "item1"}, map[string]interface{}{"name": "item2"}, map[string]interface{}{"name": "item3"}, }, "$results": []interface{}{}, } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events logEvents(t, result.RunEventStream) // Display the set value t.Logf("$results value: %v", result.Context.Variables["$results"]) t.Log("Loop workflow completed") } // TestJSONWorkflow demonstrates loading a workflow from JSON func TestJSONWorkflow(t *testing.T) { jsonData := `{ "version": "3.6", "name": "JSON Example", "registry": { "services": ["TestService(input(STRING)) RETURN output(STRING)"], "components": [], "vars": ["$input(STRING)", "$output(STRING)"], "files": { "inputs": [], "artifacts": [] } }, "steps": [ { "id": "Service_TestService", "in": {"input": "=$input"}, "out": {"$output": "=_result.output"}, "next": "Stop_End" }, { "id": "Stop_End" } ] }` var wf workflow.Workflow if err := json.Unmarshal([]byte(jsonData), &wf); err != nil { t.Fatalf("Failed to unmarshal workflow: %v", err) } t.Logf("Loaded workflow: %s", wf.Name) // Validate if err := wf.Validate(); err != nil { t.Fatalf("Invalid workflow: %v", err) } t.Log("Workflow is valid") } // TestIfCondition tests conditional step execution with if property func TestIfCondition(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "If Condition Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{ "$enableFeatureA(BOOL)", "$enableFeatureB(BOOL)", "$executedA(STRING)", "$executedB(STRING)", "$executedC(STRING)", "$amount(NUMBER)", }, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{}, }, }, Steps: []workflow.Step{ { ID: "Set_ExecuteA", If: "$enableFeatureA == true", Target: "$executedA", Value: "A was executed", Next: "Set_ExecuteB", }, { ID: "Set_ExecuteB", If: "$enableFeatureB == true", Target: "$executedB", Value: "B was executed", Next: "Set_ExecuteC", }, { ID: "Set_ExecuteC", If: "$amount > 100", Target: "$executedC", Value: "C was executed", Next: "Stop_End", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } // Test case 1: enableFeatureA=true, enableFeatureB=false, amount=50 t.Run("EnableA_DisableB_LowAmount", func(t *testing.T) { initialVars := map[string]interface{}{ "$enableFeatureA": true, "$enableFeatureB": false, "$amount": 50, } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events logEvents(t, result.RunEventStream) // Verify Set_ExecuteA was executed (if condition true) if result.Context.Variables["$executedA"] != "A was executed" { t.Errorf("Expected $executedA to be 'A was executed', got: %v", result.Context.Variables["$executedA"]) } // Verify Set_ExecuteB was skipped (if condition false) if result.Context.Variables["$executedB"] != nil { t.Errorf("Expected $executedB to be nil (step skipped), got: %v", result.Context.Variables["$executedB"]) } // Verify Set_ExecuteC was skipped (if condition false: amount <= 100) if result.Context.Variables["$executedC"] != nil { t.Errorf("Expected $executedC to be nil (step skipped), got: %v", result.Context.Variables["$executedC"]) } t.Logf("Final variables: executedA=%v, executedB=%v, executedC=%v", result.Context.Variables["$executedA"], result.Context.Variables["$executedB"], result.Context.Variables["$executedC"]) }) // Test case 2: enableFeatureA=false, enableFeatureB=true, amount=150 t.Run("DisableA_EnableB_HighAmount", func(t *testing.T) { initialVars := map[string]interface{}{ "$enableFeatureA": false, "$enableFeatureB": true, "$amount": 150, } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events logEvents(t, result.RunEventStream) // Verify Set_ExecuteA was skipped (if condition false) if result.Context.Variables["$executedA"] != nil { t.Errorf("Expected $executedA to be nil (step skipped), got: %v", result.Context.Variables["$executedA"]) } // Verify Set_ExecuteB was executed (if condition true) if result.Context.Variables["$executedB"] != "B was executed" { t.Errorf("Expected $executedB to be 'B was executed', got: %v", result.Context.Variables["$executedB"]) } // Verify Set_ExecuteC was executed (if condition true: amount > 100) if result.Context.Variables["$executedC"] != "C was executed" { t.Errorf("Expected $executedC to be 'C was executed', got: %v", result.Context.Variables["$executedC"]) } }) // Test case 3: All conditions true t.Run("AllConditionsTrue", func(t *testing.T) { initialVars := map[string]interface{}{ "$enableFeatureA": true, "$enableFeatureB": true, "$amount": 200, } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events logEvents(t, result.RunEventStream) // Verify all steps were executed if result.Context.Variables["$executedA"] != "A was executed" { t.Errorf("Expected $executedA to be 'A was executed', got: %v", result.Context.Variables["$executedA"]) } if result.Context.Variables["$executedB"] != "B was executed" { t.Errorf("Expected $executedB to be 'B was executed', got: %v", result.Context.Variables["$executedB"]) } if result.Context.Variables["$executedC"] != "C was executed" { t.Errorf("Expected $executedC to be 'C was executed', got: %v", result.Context.Variables["$executedC"]) } }) } // TestFileReadWrite tests file read and write operations using in-memory storage func TestFileReadWrite(t *testing.T) { t.Run("WriteReadAppend", func(t *testing.T) { // Create fresh adapters for this test fileAdapter := workflow.NewDefaultFileAdapter() // Register FileOps component handler using helper componentAdapter := workflow.NewDefaultComponentAdapter() fileOpsHandler := createFileOpsHandler(fileAdapter) componentAdapter.RegisterHandler("FileOps", fileOpsHandler) componentAdapter.RegisterHandler("FileOps2", fileOpsHandler) adapters := createTestAdaptersWithFile(fileAdapter) adapters.Component = componentAdapter wf := &workflow.Workflow{ Version: "3.6", Name: "File Read/Write Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{"FileOps"}, Vars: []string{ "$message(STRING)", "$fileContent(STRING)", "$appendedContent(STRING)", }, Files: workflow.FilesRegistry{ Inputs: []string{"Process/input.txt"}, Artifacts: []string{"Process/Artifacts/*"}, }, }, Steps: []workflow.Step{ { ID: "Write_InitialFile", Target: "Process/Artifacts/test.txt", Value: "=$message", Next: "Component_FileOps", }, { ID: "Component_FileOps", In: workflow.StepInput{ "operation": "read", "path": "Process/Artifacts/test.txt", }, Out: workflow.StepOutput{ "$fileContent": "=_result.content", }, Next: "Write_AppendFile", }, { ID: "Write_AppendFile", Target: "Process/Artifacts/test.txt", Value: " - appended", Mode: "append", Next: "Component_FileOps2", }, { ID: "Component_FileOps2", In: workflow.StepInput{ "operation": "read", "path": "Process/Artifacts/test.txt", }, Out: workflow.StepOutput{ "$appendedContent": "=_result.content", }, Next: "Write_NewFile", }, { ID: "Write_NewFile", Target: "Process/Artifacts/new.txt", Value: "New file content", Mode: "failIfExists", Next: "Stop_End", }, { ID: "Stop_End", }, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } initialVars := map[string]interface{}{ "$message": "Hello, World!", } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events logEvents(t, result.RunEventStream) // Verify initial write and read fileContent := result.Context.Variables["$fileContent"] if fileContent != "Hello, World!" { t.Errorf("Expected $fileContent to be 'Hello, World!', got: %v", fileContent) } // Verify append operation appendedContent := result.Context.Variables["$appendedContent"] if appendedContent != "Hello, World! - appended" { t.Errorf("Expected $appendedContent to be 'Hello, World! - appended', got: %v", appendedContent) } // Verify files exist in adapter testFileContent := fileAdapter.GetFile("Process/Artifacts/test.txt") if string(testFileContent) != "Hello, World! - appended" { t.Errorf("Expected test.txt content to be 'Hello, World! - appended', got: %s", string(testFileContent)) } newFileContent := fileAdapter.GetFile("Process/Artifacts/new.txt") if string(newFileContent) != "New file content" { t.Errorf("Expected new.txt content to be 'New file content', got: %s", string(newFileContent)) } }) t.Run("FailIfExists", func(t *testing.T) { // Create fresh adapters for this test fileAdapter := workflow.NewDefaultFileAdapter() adapters := createTestAdaptersWithFile(fileAdapter) // Create a new workflow that tries to write to an existing file with failIfExists mode wfFail := &workflow.Workflow{ Version: "3.6", Name: "Fail If Exists Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{}, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{"Process/Artifacts/*"}, }, }, Steps: []workflow.Step{ { ID: "Write_Existing", Target: "Process/Artifacts/existing.txt", Value: "Should fail", Mode: "failIfExists", Next: "Stop_End", }, { ID: "Stop_End", }, }, } // Pre-populate the file fileAdapter.SetFile("Process/Artifacts/existing.txt", []byte("Already exists")) engineFail, err := workflow.NewEngine(wfFail) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engineFail.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Execute returned error: %v", err) } // Consume events for event := range result.RunEventStream { if event.Type == workflow.RunEventStepError || event.Type == workflow.RunEventWorkflowFailed { t.Logf("Error event: %s - %v", event.Type, event.Payload) } } // Check that workflow failed if result.Context.Status != workflow.StatusFailed { t.Fatalf("Expected workflow status to be 'failed', got: %s", result.Context.Status) } // Verify original content is unchanged content := fileAdapter.GetFile("Process/Artifacts/existing.txt") if string(content) != "Already exists" { t.Errorf("Expected content to remain 'Already exists', got: %s", string(content)) } }) t.Run("OverwriteMode", func(t *testing.T) { // Create fresh adapters for this test fileAdapter := workflow.NewDefaultFileAdapter() adapters := createTestAdaptersWithFile(fileAdapter) // Create a new workflow that overwrites an existing file wfOverwrite := &workflow.Workflow{ Version: "3.6", Name: "Overwrite Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{}, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{"Process/Artifacts/*"}, }, }, Steps: []workflow.Step{ { ID: "Write_Overwrite", Target: "Process/Artifacts/overwrite.txt", Value: "New content", Mode: "overwrite", Next: "Stop_End", }, { ID: "Stop_End", }, }, } // Pre-populate the file fileAdapter.SetFile("Process/Artifacts/overwrite.txt", []byte("Old content")) engineOverwrite, err := workflow.NewEngine(wfOverwrite) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engineOverwrite.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events drainEvents(result.RunEventStream) // Verify content was overwritten content := fileAdapter.GetFile("Process/Artifacts/overwrite.txt") if string(content) != "New content" { t.Errorf("Expected content to be 'New content', got: %s", string(content)) } }) } // TestParallelLoopExecution tests parallel loop execution func TestParallelLoopExecution(t *testing.T) { t.Run("ParallelMode", func(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "ParallelLoopTest", Registry: workflow.Registry{ Files: workflow.FilesRegistry{ Artifacts: []string{"Process/Artifacts/**"}, }, }, Steps: []workflow.Step{ { ID: "Loop_ProcessParallel", Mode: "parallel", // Parallel mode Source: "=$items", Children: []string{"Set_Result"}, Next: "Stop_End", }, { ID: "Set_Result", Target: "$results[_index]", Value: "=_item", Next: "RETURN", }, { ID: "Stop_End", }, }, } // Create engine engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } // Execute workflow with initial variables adapters := createTestAdapters() initialVars := map[string]interface{}{ "$items": []interface{}{int64(1), int64(2), int64(3), int64(4), int64(5), int64(6), int64(7), int64(8), int64(9), int64(10)}, "$results": []interface{}{}, } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Log events to see what's happening logEvents(t, result.RunEventStream) // Verify results - all items should be doubled results, ok := result.Context.Variables["$results"].([]interface{}) if !ok { t.Fatalf("Expected $results to be a slice, got: %T", result.Context.Variables["$results"]) } if len(results) != 10 { t.Errorf("Expected 10 results, got %d", len(results)) } // Verify each result matches the input for i := 0; i < 10; i++ { expected := int64(i + 1) actual, ok := results[i].(int64) if !ok { t.Errorf("Result[%d]: expected int64, got %T", i, results[i]) continue } if actual != expected { t.Errorf("Result[%d]: expected %d, got %d", i, expected, actual) } } }) t.Run("SerialVsParallelComparison", func(t *testing.T) { // Test that serial and parallel modes produce identical results testCases := []struct { mode string }{ {"serial"}, {"parallel"}, } for _, tc := range testCases { t.Run(tc.mode, func(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "ComparisonTest", Registry: workflow.Registry{ Files: workflow.FilesRegistry{ Artifacts: []string{"Process/Artifacts/**"}, }, }, Steps: []workflow.Step{ { ID: "Loop_Process", Mode: tc.mode, Source: "=$items", Children: []string{"Set_Squared"}, Next: "Stop_End", }, { ID: "Set_Squared", Target: "$squares[_index]", Value: "=_item", Next: "RETURN", }, { ID: "Stop_End", }, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } adapters := createTestAdapters() initialVars := map[string]interface{}{ "$items": []interface{}{int64(5), int64(10), int64(15), int64(20)}, "$squares": []interface{}{}, } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } drainEvents(result.RunEventStream) // Verify results squares, ok := result.Context.Variables["$squares"].([]interface{}) if !ok { t.Fatalf("Expected $squares to be a slice") } expected := []int64{5, 10, 15, 20} for i, exp := range expected { actual := squares[i].(int64) if actual != exp { t.Errorf("squares[%d]: expected %d, got %d", i, exp, actual) } } }) } }) } // TestParallelChildrenExecution tests parallel children execution func TestParallelChildrenExecution(t *testing.T) { t.Run("MultipleChildren", func(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "ParallelChildrenTest", Registry: workflow.Registry{ Files: workflow.FilesRegistry{ Artifacts: []string{"Process/Artifacts/**"}, }, }, Steps: []workflow.Step{ { ID: "Noop_Parent", Children: []string{"Set_Result1", "Set_Result2", "Set_Result3", "Set_Result4"}, Next: "Stop_End", }, { ID: "Set_Result1", Target: "$result1", Value: "=$input", Next: "RETURN", }, { ID: "Set_Result2", Target: "$result2", Value: "=$input", Next: "RETURN", }, { ID: "Set_Result3", Target: "$result3", Value: "=$input", Next: "RETURN", }, { ID: "Set_Result4", Target: "$result4", Value: "=$input", Next: "RETURN", }, { ID: "Stop_End", }, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } adapters := createTestAdapters() initialVars := map[string]interface{}{ "$input": int64(100), } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } drainEvents(result.RunEventStream) // Verify all children executed and set their results vars := result.Context.Variables result1, ok := vars["$result1"].(int64) if !ok || result1 != 100 { t.Errorf("Expected $result1 to be 100, got: %v", vars["$result1"]) } result2, ok := vars["$result2"].(int64) if !ok || result2 != 100 { t.Errorf("Expected $result2 to be 100, got: %v", vars["$result2"]) } result3, ok := vars["$result3"].(int64) if !ok || result3 != 100 { t.Errorf("Expected $result3 to be 100, got: %v", vars["$result3"]) } result4, ok := vars["$result4"].(int64) if !ok || result4 != 100 { t.Errorf("Expected $result4 to be 100, got: %v", vars["$result4"]) } }) t.Run("SingleChildOptimization", func(t *testing.T) { // Verify single child doesn't spawn unnecessary goroutines wf := &workflow.Workflow{ Version: "3.6", Name: "SingleChildTest", Registry: workflow.Registry{ Files: workflow.FilesRegistry{ Artifacts: []string{"Process/Artifacts/**"}, }, }, Steps: []workflow.Step{ { ID: "Noop_Parent", Children: []string{"Set_Result"}, // Single child Next: "Stop_End", }, { ID: "Set_Result", Target: "$result", Value: "=$value", Next: "RETURN", }, { ID: "Stop_End", }, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } adapters := createTestAdapters() initialVars := map[string]interface{}{ "$value": int64(42), } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } drainEvents(result.RunEventStream) // Verify result resultVal, ok := result.Context.Variables["$result"].(int64) if !ok || resultVal != 42 { t.Errorf("Expected $result to be 42, got: %v", result.Context.Variables["$result"]) } }) } // TestParallelErrorHandling tests error handling in parallel execution func TestParallelErrorHandling(t *testing.T) { t.Run("ParallelLoopWithError", func(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "ParallelErrorTest", Registry: workflow.Registry{ Files: workflow.FilesRegistry{ Artifacts: []string{"Process/Artifacts/**"}, }, }, Steps: []workflow.Step{ { ID: "Loop_Process", Mode: "parallel", Source: "=$items", Children: []string{"Set_InvalidTarget"}, Next: "Stop_End", }, { ID: "Set_InvalidTarget", Target: "invalidTarget", // This will cause an error (not starting with $) Value: "=_item * 2", Next: "RETURN", }, { ID: "Stop_End", }, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } adapters := createTestAdapters() initialVars := map[string]interface{}{ "$items": []interface{}{int64(1), int64(2), int64(3), int64(4), int64(5)}, } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Collect events to check for errors hasError := false for event := range result.RunEventStream { if event.Type == workflow.RunEventStepError || event.Type == workflow.RunEventWorkflowFailed { hasError = true } } if !hasError { t.Error("Expected error event in parallel execution, but none found") } }) } // TestFileOutput tests file output via step output mapping func TestFileOutput(t *testing.T) { t.Run("OutputToFile", func(t *testing.T) { // Create fresh adapters for this test fileAdapter := workflow.NewDefaultFileAdapter() serviceAdapter := workflow.NewDefaultServiceAdapter() serviceAdapter.RegisterHandler("GenerateReport", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) { return &workflow.ServiceResult{ Data: map[string]interface{}{ "report": "Generated report content", "summary": "Brief summary", }, }, nil }) adapters := &workflow.Adapters{ Service: serviceAdapter, Component: workflow.NewDefaultComponentAdapter(), LLM: workflow.NewDefaultLLMAdapter(), File: fileAdapter, } wf := &workflow.Workflow{ Version: "3.6", Name: "File Output Test", Registry: workflow.Registry{ Services: []string{ "GenerateReport(input(STRING)) RETURN report(STRING), summary(STRING)", }, Components: []string{}, Vars: []string{ "$input(STRING)", "$reportContent(STRING)", }, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{"Process/Artifacts/*"}, }, }, Steps: []workflow.Step{ { ID: "Service_GenerateReport", In: workflow.StepInput{ "input": "=$input", }, Out: workflow.StepOutput{ "$reportContent": "=_result.report", "/Process/Artifacts/report.txt": "=_result.report", "/Process/Artifacts/summary.txt": "=_result.summary", }, Next: "Stop_End", }, { ID: "Stop_End", }, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } initialVars := map[string]interface{}{ "$input": "test input", } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events and check for file write events fileWriteCount := 0 for event := range result.RunEventStream { t.Logf("Event: %s - %v", event.Type, event.StepID) if event.Type == workflow.RunEventFileDone { fileWriteCount++ t.Logf("File done event: path=%v", event.Payload["path"]) } } // Verify variable was set if result.Context.Variables["$reportContent"] != "Generated report content" { t.Errorf("Expected $reportContent to be 'Generated report content', got: %v", result.Context.Variables["$reportContent"]) } // Verify files were written reportContent := fileAdapter.GetFile("/Process/Artifacts/report.txt") if string(reportContent) != "Generated report content" { t.Errorf("Expected report.txt to contain 'Generated report content', got: %s", string(reportContent)) } summaryContent := fileAdapter.GetFile("/Process/Artifacts/summary.txt") if string(summaryContent) != "Brief summary" { t.Errorf("Expected summary.txt to contain 'Brief summary', got: %s", string(summaryContent)) } // Verify we got file write events if fileWriteCount != 2 { t.Errorf("Expected 2 file write events, got: %d", fileWriteCount) } }) t.Run("OutputToFileWithPathInterpolation", func(t *testing.T) { // Test file output with variable interpolation in path fileAdapter := workflow.NewDefaultFileAdapter() serviceAdapter := workflow.NewDefaultServiceAdapter() serviceAdapter.RegisterHandler("ProcessItem", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) { name := params["name"].(string) return &workflow.ServiceResult{ Data: map[string]interface{}{ "content": "Processed: " + name, "path": name + ".txt", }, }, nil }) adapters := &workflow.Adapters{ Service: serviceAdapter, Component: workflow.NewDefaultComponentAdapter(), LLM: workflow.NewDefaultLLMAdapter(), File: fileAdapter, } wf := &workflow.Workflow{ Version: "3.6", Name: "File Output Path Interpolation Test", Registry: workflow.Registry{ Services: []string{ "ProcessItem(name(STRING)) RETURN content(STRING), path(STRING)", }, Components: []string{}, Vars: []string{ "$items([OBJECT])", "$results([STRING])", }, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{"output/*"}, }, }, Steps: []workflow.Step{ { ID: "Loop_ProcessItems", Mode: "serial", Source: "=$items", Children: []string{"Service_ProcessItem"}, Next: "Stop_End", }, { ID: "Service_ProcessItem", In: workflow.StepInput{ "name": "=_item.name", }, Out: workflow.StepOutput{ "$results[_index]": "=_result.content", "/output/{_result.path}": "=_result.content", }, Next: "RETURN", }, { ID: "Stop_End", }, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } initialVars := map[string]interface{}{ "$items": []interface{}{ map[string]interface{}{"name": "alpha"}, map[string]interface{}{"name": "beta"}, }, "$results": []interface{}{}, } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events logEvents(t, result.RunEventStream) // Verify files were written with interpolated paths alphaContent := fileAdapter.GetFile("/output/alpha.txt") if string(alphaContent) != "Processed: alpha" { t.Errorf("Expected alpha.txt to contain 'Processed: alpha', got: %s", string(alphaContent)) } betaContent := fileAdapter.GetFile("/output/beta.txt") if string(betaContent) != "Processed: beta" { t.Errorf("Expected beta.txt to contain 'Processed: beta', got: %s", string(betaContent)) } // Verify results array results, ok := result.Context.Variables["$results"].([]interface{}) if !ok { t.Fatalf("Expected $results to be a slice, got: %T", result.Context.Variables["$results"]) } if len(results) != 2 { t.Fatalf("Expected 2 results, got %d", len(results)) } if results[0] != "Processed: alpha" { t.Errorf("Expected results[0] to be 'Processed: alpha', got: %v", results[0]) } if results[1] != "Processed: beta" { t.Errorf("Expected results[1] to be 'Processed: beta', got: %v", results[1]) } }) } // TestParallelConcurrency tests concurrent access patterns func TestParallelConcurrency(t *testing.T) { t.Run("ConcurrentVariableWrites", func(t *testing.T) { // Test that concurrent writes to different array indices work correctly wf := &workflow.Workflow{ Version: "3.6", Name: "ConcurrencyTest", Registry: workflow.Registry{ Files: workflow.FilesRegistry{ Artifacts: []string{"Process/Artifacts/**"}, }, }, Steps: []workflow.Step{ { ID: "Loop_Process", Mode: "parallel", Source: "=$items", Children: []string{"Set_Value"}, Next: "Stop_End", }, { ID: "Set_Value", Target: "$outputs[_index]", Value: "=_item", Next: "RETURN", }, { ID: "Stop_End", }, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } adapters := createTestAdapters() items := make([]interface{}, 20) for i := 0; i < 20; i++ { items[i] = int64(i) } initialVars := map[string]interface{}{ "$items": items, "$outputs": []interface{}{}, } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } logEvents(t, result.RunEventStream) // Verify all outputs are correct outputs, ok := result.Context.Variables["$outputs"].([]interface{}) if !ok { t.Fatalf("Expected $outputs to be a slice") } if len(outputs) != 20 { t.Fatalf("Expected 20 outputs, got %d", len(outputs)) } // Verify each output for i := 0; i < 20; i++ { expected := int64(i) actual, ok := outputs[i].(int64) if !ok { t.Errorf("Output[%d]: expected int64, got %T", i, outputs[i]) continue } if actual != expected { t.Errorf("Output[%d]: expected %d, got %d", i, expected, actual) } } }) } func TestAPICallWorkflow(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "API Test Workflow", Registry: workflow.Registry{ Services: []string{}, APIs: []workflow.APIDefinition{ { ID: "TestAPI", Method: "POST", URL: "https://api.example.com/v1/test", Auth: "", // No auth for simple test Headers: map[string]string{ "Content-Type": "application/json", }, Desc: "Test API endpoint", }, }, Components: []string{}, Vars: []string{"$result(OBJECT)", "$status(STRING)"}, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{}, }, }, Steps: []workflow.Step{ { ID: "API_TestAPI", In: workflow.StepInput{ "body": map[string]interface{}{ "message": "Hello API", }, }, Out: workflow.StepOutput{ "$result": "=_result", "$status": "=_result.status", }, Next: "Stop_End", }, { ID: "Stop_End", }, }, } // Create mock API adapter apiAdapter := NewMockAPIAdapter() apiAdapter.SetResponse(map[string]interface{}{ "status": "success", "data": "Response data", }) // Execute workflow engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } ctx := context.Background() adapters := &workflow.Adapters{ Service: workflow.NewDefaultServiceAdapter(), API: apiAdapter, Component: workflow.NewDefaultComponentAdapter(), LLM: workflow.NewDefaultLLMAdapter(), File: workflow.NewDefaultFileAdapter(), } result, err := engine.Execute(ctx, map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Wait for completion for event := range result.RunEventStream { t.Logf("Event: %s - %v - Payload: %+v", event.Type, event.StepID, event.Payload) if event.Type == workflow.RunEventStepError { t.Logf("Step Error Payload: %v", event.Payload["error"]) } if event.Type == workflow.RunEventWorkflowDone || event.Type == workflow.RunEventWorkflowFailed { break } } // Verify execution status, ok := result.Context.Variables["$status"] if !ok { t.Fatalf("Expected $status to be set. Variables: %+v", result.Context.Variables) } if status != "success" { t.Errorf("Expected status to be 'success', got %v", status) } } // MockAPIAdapter is a test adapter for API calls type MockAPIAdapter struct { response map[string]interface{} mu sync.RWMutex } func NewMockAPIAdapter() *MockAPIAdapter { return &MockAPIAdapter{ response: make(map[string]interface{}), } } func (m *MockAPIAdapter) SetResponse(resp map[string]interface{}) { m.mu.Lock() defer m.mu.Unlock() m.response = resp } func (m *MockAPIAdapter) Call(ctx context.Context, apiDef *workflow.APIDefinition, params map[string]interface{}) (map[string]interface{}, error) { m.mu.RLock() defer m.mu.RUnlock() return m.response, nil } // TestNilValidation tests nil parameter validation func TestNilValidation(t *testing.T) { t.Run("NilWorkflow", func(t *testing.T) { _, err := workflow.NewEngine(nil) if err == nil { t.Fatal("Expected error for nil workflow") } if !strings.Contains(err.Error(), "cannot be nil") { t.Errorf("Expected 'cannot be nil' error, got: %v", err) } }) t.Run("NilContext", func(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "Test", Registry: workflow.Registry{ Services: []string{}, APIs: []workflow.APIDefinition{}, Components: []string{}, Vars: []string{}, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{}, }, }, Steps: []workflow.Step{ {ID: "Stop_End"}, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } adapters := &workflow.Adapters{ Service: workflow.NewDefaultServiceAdapter(), API: workflow.NewDefaultAPIAdapter(), Component: workflow.NewDefaultComponentAdapter(), LLM: workflow.NewDefaultLLMAdapter(), File: workflow.NewDefaultFileAdapter(), } //nolint:staticcheck // intentionally passing nil context to test validation _, err = engine.Execute(nil, nil, adapters) if err == nil { t.Fatal("Expected error for nil context") } if !strings.Contains(err.Error(), "context cannot be nil") { t.Errorf("Expected 'context cannot be nil' error, got: %v", err) } }) t.Run("NilAdapters", func(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "Test", Registry: workflow.Registry{ Services: []string{}, APIs: []workflow.APIDefinition{}, Components: []string{}, Vars: []string{}, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{}, }, }, Steps: []workflow.Step{ {ID: "Stop_End"}, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } _, err = engine.Execute(context.Background(), nil, nil) if err == nil { t.Fatal("Expected error for nil adapters") } if !strings.Contains(err.Error(), "adapters cannot be nil") { t.Errorf("Expected 'adapters cannot be nil' error, got: %v", err) } }) } // TestArrayTypeConversion tests string to array type conversion for variables func TestArrayTypeConversion(t *testing.T) { t.Run("ConvertStringToObjectArray", func(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "Array Type Conversion Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{ "$items([OBJECT])", "$names([STRING])", "$count(NUMBER)", }, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{}, }, }, Steps: []workflow.Step{ { ID: "Set_Items", Target: "$items", Value: `[{"name":"item1","value":10},{"name":"item2","value":20}]`, Next: "Set_Names", }, { ID: "Set_Names", Target: "$names", Value: `["alice","bob","charlie"]`, Next: "Set_Count", }, { ID: "Set_Count", Target: "$count", Value: "3", Next: "Stop_End", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events logEvents(t, result.RunEventStream) // Verify $items was converted from string to []interface{} with maps items, ok := result.Context.Variables["$items"].([]interface{}) if !ok { t.Fatalf("Expected $items to be []interface{}, got: %T", result.Context.Variables["$items"]) } if len(items) != 2 { t.Errorf("Expected 2 items, got %d", len(items)) } // Check first item item1, ok := items[0].(map[string]interface{}) if !ok { t.Errorf("Expected items[0] to be map[string]interface{}, got: %T", items[0]) } else { if item1["name"] != "item1" { t.Errorf("Expected items[0].name to be 'item1', got: %v", item1["name"]) } // JSON unmarshals numbers as float64 if item1["value"] != float64(10) { t.Errorf("Expected items[0].value to be 10, got: %v", item1["value"]) } } // Verify $names was converted from string to []interface{} with strings names, ok := result.Context.Variables["$names"].([]interface{}) if !ok { t.Fatalf("Expected $names to be []interface{}, got: %T", result.Context.Variables["$names"]) } if len(names) != 3 { t.Errorf("Expected 3 names, got %d", len(names)) } expectedNames := []string{"alice", "bob", "charlie"} for i, expected := range expectedNames { if names[i] != expected { t.Errorf("Expected names[%d] to be '%s', got: %v", i, expected, names[i]) } } // Verify $count remains as string (no array type conversion) count := result.Context.Variables["$count"] if count != "3" { t.Errorf("Expected $count to be string '3', got: %v (type: %T)", count, count) } }) t.Run("InvalidJSONStringForArray", func(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "Invalid Array JSON Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{ "$items([OBJECT])", }, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{}, }, }, Steps: []workflow.Step{ { ID: "Set_InvalidItems", Target: "$items", Value: `invalid json`, Next: "Stop_End", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Execute returned error: %v", err) } // Consume events and check for error hasError := false for event := range result.RunEventStream { t.Logf("Event: %s - %v - %v", event.Type, event.StepID, event.Payload) if event.Type == workflow.RunEventStepError || event.Type == workflow.RunEventWorkflowFailed { hasError = true } } // Workflow should have failed due to invalid JSON if !hasError { t.Error("Expected workflow to fail with invalid JSON, but it succeeded") } if result.Context.Status != workflow.StatusFailed { t.Errorf("Expected workflow status to be 'failed', got: %s", result.Context.Status) } }) t.Run("ArrayTypeWithNonStringValue", func(t *testing.T) { // When value is already an array, it should not require conversion wf := &workflow.Workflow{ Version: "3.6", Name: "Array Already Array Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{ "$items([OBJECT])", }, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{}, }, }, Steps: []workflow.Step{ { ID: "Set_Items", Target: "$items", Value: "=$presetItems", Next: "Stop_End", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } initialVars := map[string]interface{}{ "$presetItems": []interface{}{ map[string]interface{}{"id": 1}, map[string]interface{}{"id": 2}, }, } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events logEvents(t, result.RunEventStream) // Value should remain as-is (already an array) items, ok := result.Context.Variables["$items"].([]interface{}) if !ok { t.Fatalf("Expected $items to be []interface{}, got: %T", result.Context.Variables["$items"]) } if len(items) != 2 { t.Errorf("Expected 2 items, got %d", len(items)) } }) } // TestFileOutputJSONMarshal tests that non-string values are JSON-marshaled when writing to files func TestFileOutputJSONMarshal(t *testing.T) { t.Run("WriteObjectToFile", func(t *testing.T) { fileAdapter := workflow.NewDefaultFileAdapter() serviceAdapter := workflow.NewDefaultServiceAdapter() serviceAdapter.RegisterHandler("GenerateData", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) { return &workflow.ServiceResult{ Data: map[string]interface{}{ "user": map[string]interface{}{ "name": "Alice", "age": 30, "email": "alice@example.com", }, "items": []interface{}{ map[string]interface{}{"id": 1, "name": "item1"}, map[string]interface{}{"id": 2, "name": "item2"}, }, "count": 2, }, }, nil }) adapters := &workflow.Adapters{ Service: serviceAdapter, Component: workflow.NewDefaultComponentAdapter(), LLM: workflow.NewDefaultLLMAdapter(), File: fileAdapter, } wf := &workflow.Workflow{ Version: "3.6", Name: "File Output JSON Marshal Test", Registry: workflow.Registry{ Services: []string{ "GenerateData() RETURN user(OBJECT), items([OBJECT]), count(NUMBER)", }, Components: []string{}, Vars: []string{}, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{"output/*"}, }, }, Steps: []workflow.Step{ { ID: "Service_GenerateData", Out: workflow.StepOutput{ "/output/user.json": "=_result.user", "/output/items.json": "=_result.items", "/output/count.json": "=_result.count", }, Next: "Stop_End", }, { ID: "Stop_End", }, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events logEvents(t, result.RunEventStream) // Verify user object was written as JSON userContent := fileAdapter.GetFile("/output/user.json") var userObj map[string]interface{} if err := json.Unmarshal(userContent, &userObj); err != nil { t.Fatalf("Failed to parse user.json as JSON: %v. Content: %s", err, string(userContent)) } if userObj["name"] != "Alice" { t.Errorf("Expected user.name to be 'Alice', got: %v", userObj["name"]) } if userObj["age"] != float64(30) { t.Errorf("Expected user.age to be 30, got: %v", userObj["age"]) } // Verify items array was written as JSON itemsContent := fileAdapter.GetFile("/output/items.json") var itemsArr []interface{} if err := json.Unmarshal(itemsContent, &itemsArr); err != nil { t.Fatalf("Failed to parse items.json as JSON: %v. Content: %s", err, string(itemsContent)) } if len(itemsArr) != 2 { t.Errorf("Expected 2 items, got: %d", len(itemsArr)) } item1 := itemsArr[0].(map[string]interface{}) if item1["name"] != "item1" { t.Errorf("Expected items[0].name to be 'item1', got: %v", item1["name"]) } // Verify number was written as JSON countContent := fileAdapter.GetFile("/output/count.json") var countVal float64 if err := json.Unmarshal(countContent, &countVal); err != nil { t.Fatalf("Failed to parse count.json as JSON: %v. Content: %s", err, string(countContent)) } if countVal != 2 { t.Errorf("Expected count to be 2, got: %v", countVal) } }) t.Run("WriteStepValueToFile", func(t *testing.T) { // Test Write_* step with non-string values fileAdapter := workflow.NewDefaultFileAdapter() adapters := createTestAdaptersWithFile(fileAdapter) wf := &workflow.Workflow{ Version: "3.6", Name: "Write Step JSON Marshal Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{ "$data(OBJECT)", }, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{"output/*"}, }, }, Steps: []workflow.Step{ { ID: "Set_Data", Target: "$data", Value: `{"status":"success","code":200}`, Next: "Write_JsonFile", }, { ID: "Write_JsonFile", Target: "output/result.json", Value: "=$data", Next: "Stop_End", }, { ID: "Stop_End", }, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events and check for errors var errorMsg string for event := range result.RunEventStream { t.Logf("Event: %s - %v - %v", event.Type, event.StepID, event.Payload) if event.Type == workflow.RunEventStepError || event.Type == workflow.RunEventWorkflowFailed { errorMsg = fmt.Sprintf("%v", event.Payload["error"]) } } if errorMsg != "" { t.Fatalf("Workflow failed with error: %s", errorMsg) } // Verify object was written as JSON (not Go format) content := fileAdapter.GetFile("output/result.json") var dataObj map[string]interface{} if err := json.Unmarshal(content, &dataObj); err != nil { t.Fatalf("Failed to parse result.json as JSON: %v. Content: %s", err, string(content)) } if dataObj["status"] != "success" { t.Errorf("Expected status to be 'success', got: %v", dataObj["status"]) } // JSON unmarshals numbers as float64 if dataObj["code"] != float64(200) { t.Errorf("Expected code to be 200, got: %v", dataObj["code"]) } }) } // TestNestedExpressionEvaluation tests that expressions are evaluated in nested structures func TestNestedExpressionEvaluation(t *testing.T) { t.Run("ExpressionsInMessagesArray", func(t *testing.T) { var capturedParams map[string]interface{} llmAdapter := workflow.NewDefaultLLMAdapter() llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { capturedParams = params return map[string]interface{}{ "content": "Response about the fruit", }, nil }) wf := &workflow.Workflow{ Version: "3.6", Name: "Nested Expression Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{ "$items([OBJECT])", "$result(STRING)", }, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{}, }, }, Steps: []workflow.Step{ { ID: "Set_Items", Target: "$items", Value: `[{"name":"apple","color":"red"},{"name":"banana","color":"yellow"}]`, Next: "Loop_ProcessItems", }, { ID: "Loop_ProcessItems", Mode: "serial", Source: "=$items", Children: []string{"LLM_Describe"}, Next: "Stop_End", }, { ID: "LLM_Describe", In: workflow.StepInput{ "model": "gpt-4", "stream": false, "messages": []interface{}{ map[string]interface{}{ "role": "user", "content": "Describe this fruit in one sentence:", }, map[string]interface{}{ "role": "user", "content": "=_item.name", }, }, }, Out: workflow.StepOutput{ "$result": "=_result.content", }, Next: "RETURN", }, { ID: "Stop_End", }, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } adapters := &workflow.Adapters{ Service: workflow.NewDefaultServiceAdapter(), Component: workflow.NewDefaultComponentAdapter(), LLM: llmAdapter, File: workflow.NewDefaultFileAdapter(), } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events logEvents(t, result.RunEventStream) // Verify that the expression in the messages array was evaluated messages, ok := capturedParams["messages"].([]interface{}) if !ok { t.Fatalf("Expected messages to be []interface{}, got: %T", capturedParams["messages"]) } if len(messages) != 2 { t.Fatalf("Expected 2 messages, got: %d", len(messages)) } // Check second message - should have "banana" (last iteration) secondMsg, ok := messages[1].(map[string]interface{}) if !ok { t.Fatalf("Expected messages[1] to be map, got: %T", messages[1]) } content, ok := secondMsg["content"].(string) if !ok { t.Fatalf("Expected content to be string, got: %T", secondMsg["content"]) } // The last item processed should be "banana" if content != "banana" { t.Errorf("Expected message content to be 'banana' (from _item.name), got: %v", content) } }) t.Run("ExpressionsInNestedObjects", func(t *testing.T) { var capturedParams map[string]interface{} serviceAdapter := workflow.NewDefaultServiceAdapter() serviceAdapter.RegisterHandler("TestService", func(ctx context.Context, params map[string]interface{}) (*workflow.ServiceResult, error) { capturedParams = params return &workflow.ServiceResult{ Data: map[string]interface{}{ "success": true, }, }, nil }) wf := &workflow.Workflow{ Version: "3.6", Name: "Nested Object Expression Test", Registry: workflow.Registry{ Services: []string{ "TestService(config(OBJECT)) RETURN success(BOOL)", }, Components: []string{}, Vars: []string{ "$userId(STRING)", "$userName(STRING)", }, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{}, }, }, Steps: []workflow.Step{ { ID: "Set_UserId", Target: "$userId", Value: "user123", Next: "Set_UserName", }, { ID: "Set_UserName", Target: "$userName", Value: "Alice", Next: "Service_TestService", }, { ID: "Service_TestService", In: workflow.StepInput{ "config": map[string]interface{}{ "user": map[string]interface{}{ "id": "=$userId", "name": "=$userName", }, "settings": map[string]interface{}{ "theme": "dark", "language": "en", }, }, }, Next: "Stop_End", }, { ID: "Stop_End", }, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } adapters := &workflow.Adapters{ Service: serviceAdapter, Component: workflow.NewDefaultComponentAdapter(), LLM: workflow.NewDefaultLLMAdapter(), File: workflow.NewDefaultFileAdapter(), } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events logEvents(t, result.RunEventStream) // Verify that nested expressions were evaluated config, ok := capturedParams["config"].(map[string]interface{}) if !ok { t.Fatalf("Expected config to be map, got: %T", capturedParams["config"]) } user, ok := config["user"].(map[string]interface{}) if !ok { t.Fatalf("Expected config.user to be map, got: %T", config["user"]) } // Verify expressions were evaluated if user["id"] != "user123" { t.Errorf("Expected user.id to be 'user123', got: %v", user["id"]) } if user["name"] != "Alice" { t.Errorf("Expected user.name to be 'Alice', got: %v", user["name"]) } // Verify non-expressions are preserved settings, ok := config["settings"].(map[string]interface{}) if !ok { t.Fatalf("Expected config.settings to be map, got: %T", config["settings"]) } if settings["theme"] != "dark" { t.Errorf("Expected settings.theme to be 'dark', got: %v", settings["theme"]) } }) } // TestLLMMessagesWithExpressions tests the exact use case from the user's example func TestLLMMessagesWithExpressions(t *testing.T) { var capturedParams map[string]interface{} llmAdapter := workflow.NewDefaultLLMAdapter() llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { capturedParams = params // Verify the messages structure messages, ok := params["messages"].([]interface{}) if !ok || len(messages) != 2 { return nil, fmt.Errorf("unexpected messages format") } // Get the fruit name from the second message msg2 := messages[1].(map[string]interface{}) fruitName := msg2["content"].(string) return map[string]interface{}{ "content": fmt.Sprintf("The %s is a delicious fruit.", fruitName), }, nil }) wf := &workflow.Workflow{ Version: "3.6", Name: "LLM Messages Expression Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{ "$fruits([OBJECT])", "$descriptions([STRING])", }, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{}, }, }, Steps: []workflow.Step{ { ID: "Set_Fruits", Target: "$fruits", Value: `[{"name":"apple"},{"name":"banana"},{"name":"orange"}]`, Next: "Loop_Describe", }, { ID: "Loop_Describe", Mode: "serial", Source: "=$fruits", Children: []string{"LLM_DescribeFruit"}, Next: "Stop_End", }, { ID: "LLM_DescribeFruit", In: workflow.StepInput{ "model": "gpt-4", "stream": true, "messages": []interface{}{ map[string]interface{}{ "role": "user", "content": "Describe this fruit in one sentence:", }, map[string]interface{}{ "role": "user", "content": "=_item.name", }, }, }, Out: workflow.StepOutput{ "$descriptions[_index]": "=_result.content", }, Next: "RETURN", }, { ID: "Stop_End", }, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } adapters := &workflow.Adapters{ Service: workflow.NewDefaultServiceAdapter(), Component: workflow.NewDefaultComponentAdapter(), LLM: llmAdapter, File: workflow.NewDefaultFileAdapter(), } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } // Consume events logEvents(t, result.RunEventStream) // Verify messages array had expressions evaluated messages, ok := capturedParams["messages"].([]interface{}) if !ok { t.Fatalf("Expected messages to be []interface{}, got: %T", capturedParams["messages"]) } if len(messages) != 2 { t.Fatalf("Expected 2 messages, got: %d", len(messages)) } // Verify first message (static content) msg1 := messages[0].(map[string]interface{}) if msg1["content"] != "Describe this fruit in one sentence:" { t.Errorf("Expected first message content to be 'Describe this fruit in one sentence:', got: %v", msg1["content"]) } // Verify second message (evaluated expression) msg2 := messages[1].(map[string]interface{}) content := msg2["content"].(string) // Should be "orange" (last iteration in the loop) if content != "orange" { t.Errorf("Expected second message content to be 'orange' (from =_item.name), got: %v", content) } // Verify descriptions were collected descriptions, ok := result.Context.Variables["$descriptions"].([]interface{}) if !ok { t.Fatalf("Expected $descriptions to be []interface{}, got: %T", result.Context.Variables["$descriptions"]) } if len(descriptions) != 3 { t.Errorf("Expected 3 descriptions, got: %d", len(descriptions)) } // Verify each description expectedDescriptions := []string{ "The apple is a delicious fruit.", "The banana is a delicious fruit.", "The orange is a delicious fruit.", } for i, expected := range expectedDescriptions { if descriptions[i] != expected { t.Errorf("Expected descriptions[%d] to be '%s', got: %v", i, expected, descriptions[i]) } } } // TestLoopSourceExpression tests that Loop source supports both old and new expression formats func TestLoopSourceExpression(t *testing.T) { t.Run("NewFormatWithEqualsPrefix", func(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "Loop with = prefix", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{ "$numbers([NUMBER])", "$doubled([NUMBER])", }, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{}, }, }, Steps: []workflow.Step{ { ID: "Loop_Double", Mode: "serial", Source: "=$numbers", // New format with = prefix Children: []string{"Set_Doubled"}, Next: "Stop_End", }, { ID: "Set_Doubled", Target: "$doubled[_index]", Value: "=_item * 2", Next: "RETURN", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } initialVars := map[string]interface{}{ "$numbers": []interface{}{int64(1), int64(2), int64(3)}, "$doubled": []interface{}{}, } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } logEvents(t, result.RunEventStream) doubled, ok := result.Context.Variables["$doubled"].([]interface{}) if !ok { t.Fatalf("Expected $doubled to be []interface{}") } expected := []int64{2, 4, 6} for i, exp := range expected { actual := doubled[i].(float64) // Arithmetic results are float64 if actual != float64(exp) { t.Errorf("doubled[%d]: expected %d, got %v", i, exp, actual) } } }) t.Run("OldFormatBackwardCompatibility", func(t *testing.T) { // Test that old format ($items without =) still works wf := &workflow.Workflow{ Version: "3.6", Name: "Loop backward compatibility", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{ "$items([STRING])", "$output([STRING])", }, Files: workflow.FilesRegistry{ Inputs: []string{}, Artifacts: []string{}, }, }, Steps: []workflow.Step{ { ID: "Loop_Process", Mode: "serial", Source: "$items", // Old format without = prefix (for backward compatibility testing) Children: []string{"Set_Output"}, Next: "Stop_End", }, { ID: "Set_Output", Target: "$output[_index]", Value: "=_item", Next: "RETURN", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } initialVars := map[string]interface{}{ "$items": []interface{}{"a", "b", "c"}, "$output": []interface{}{}, } result, err := engine.Execute(context.Background(), initialVars, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } logEvents(t, result.RunEventStream) output, ok := result.Context.Variables["$output"].([]interface{}) if !ok { t.Fatalf("Expected $output to be []interface{}") } expected := []string{"a", "b", "c"} for i, exp := range expected { if output[i] != exp { t.Errorf("output[%d]: expected %s, got %v", i, exp, output[i]) } } }) } // TestDocInjection tests that docs are resolved and injected into LLM system prompts func TestDocInjection(t *testing.T) { t.Run("DocsAppendedToSystemMessage", func(t *testing.T) { var capturedParams map[string]interface{} llmAdapter := workflow.NewDefaultLLMAdapter() llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { capturedParams = params return map[string]interface{}{ "content": "LLM response", }, nil }) docAdapter := workflow.NewDefaultDocAdapter() docAdapter.SetDoc("11", "VL syntax rules content here") docAdapter.SetDoc("20", "Component generation spec content here") wf := &workflow.Workflow{ Version: "3.6", Name: "Doc Injection Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{"$output(STRING)"}, Files: workflow.FilesRegistry{Inputs: []string{}, Artifacts: []string{}}, Docs: map[string]string{ "11": "VL syntax rules", "20": "Component generation spec", }, }, Steps: []workflow.Step{ { ID: "LLM_Generate", In: workflow.StepInput{ "model": "claude-3-opus", "docs": []interface{}{"11", "20"}, "messages": []interface{}{ map[string]interface{}{"role": "system", "content": "Follow the rules."}, map[string]interface{}{"role": "user", "content": "Generate something"}, }, }, Out: workflow.StepOutput{"$output": "=_result.content"}, Next: "Stop_End", }, {ID: "Stop_End"}, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } adapters := &workflow.Adapters{ Service: workflow.NewDefaultServiceAdapter(), Component: workflow.NewDefaultComponentAdapter(), LLM: llmAdapter, File: workflow.NewDefaultFileAdapter(), Doc: docAdapter, } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } logEvents(t, result.RunEventStream) // Verify docs were injected into system message messages, ok := capturedParams["messages"].([]interface{}) if !ok { t.Fatal("Expected messages in captured params") } sysMsg, ok := messages[0].(map[string]interface{}) if !ok { t.Fatal("Expected first message to be a map") } content, _ := sysMsg["content"].(string) if !strings.Contains(content, "Follow the rules.") { t.Error("Expected original system content to be preserved") } if !strings.Contains(content, "VL syntax rules content here") { t.Error("Expected doc 11 content to be injected") } if !strings.Contains(content, "Component generation spec content here") { t.Error("Expected doc 20 content to be injected") } if !strings.Contains(content, "[Doc 11: VL syntax rules]") { t.Error("Expected doc header with ID and description") } // Verify docs param was removed (not passed to LLM) if _, hasDocs := capturedParams["docs"]; hasDocs { t.Error("Expected docs param to be removed before passing to LLM adapter") } // Verify output if result.Context.Variables["$output"] != "LLM response" { t.Errorf("Expected $output='LLM response', got %v", result.Context.Variables["$output"]) } }) t.Run("DocsWithNoSystemMessage", func(t *testing.T) { var capturedParams map[string]interface{} llmAdapter := workflow.NewDefaultLLMAdapter() llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { capturedParams = params return map[string]interface{}{"content": "ok"}, nil }) docAdapter := workflow.NewDefaultDocAdapter() docAdapter.SetDoc("5", "Some doc content") wf := &workflow.Workflow{ Version: "3.6", Name: "Doc No System Msg Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{}, Files: workflow.FilesRegistry{Inputs: []string{}, Artifacts: []string{}}, Docs: map[string]string{"5": "Some reference doc"}, }, Steps: []workflow.Step{ { ID: "LLM_Generate", In: workflow.StepInput{ "model": "gpt-4", "docs": []interface{}{"5"}, "messages": []interface{}{ map[string]interface{}{"role": "user", "content": "Hello"}, }, }, Next: "Stop_End", }, {ID: "Stop_End"}, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } adapters := &workflow.Adapters{ Service: workflow.NewDefaultServiceAdapter(), Component: workflow.NewDefaultComponentAdapter(), LLM: llmAdapter, File: workflow.NewDefaultFileAdapter(), Doc: docAdapter, } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute: %v", err) } logEvents(t, result.RunEventStream) // A system message should have been prepended messages := capturedParams["messages"].([]interface{}) if len(messages) != 2 { t.Fatalf("Expected 2 messages (prepended system + user), got %d", len(messages)) } sysMsg := messages[0].(map[string]interface{}) if sysMsg["role"] != "system" { t.Errorf("Expected first message role='system', got %v", sysMsg["role"]) } content := sysMsg["content"].(string) if !strings.Contains(content, "Some doc content") { t.Errorf("Expected doc content in prepended system message, got: %s", content) } }) t.Run("NoDocs", func(t *testing.T) { var capturedParams map[string]interface{} llmAdapter := workflow.NewDefaultLLMAdapter() llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { capturedParams = params return map[string]interface{}{"content": "ok"}, nil }) wf := &workflow.Workflow{ Version: "3.6", Name: "No Docs Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{}, Files: workflow.FilesRegistry{Inputs: []string{}, Artifacts: []string{}}, }, Steps: []workflow.Step{ { ID: "LLM_Generate", In: workflow.StepInput{ "model": "gpt-4", "messages": []interface{}{ map[string]interface{}{"role": "system", "content": "Be helpful."}, map[string]interface{}{"role": "user", "content": "Hi"}, }, }, Next: "Stop_End", }, {ID: "Stop_End"}, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } adapters := &workflow.Adapters{ Service: workflow.NewDefaultServiceAdapter(), Component: workflow.NewDefaultComponentAdapter(), LLM: llmAdapter, File: workflow.NewDefaultFileAdapter(), } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute: %v", err) } logEvents(t, result.RunEventStream) // System message should be untouched messages := capturedParams["messages"].([]interface{}) sysMsg := messages[0].(map[string]interface{}) if sysMsg["content"] != "Be helpful." { t.Errorf("Expected system message unchanged, got: %v", sysMsg["content"]) } }) } func TestEqualsPrefixBranchAndIf(t *testing.T) { t.Run("BranchCaseWithEqualsPrefix", func(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "TestBranchEqualsPrefix", Registry: workflow.Registry{ Vars: []string{"$status(STRING)", "$result(STRING)"}, }, Steps: []workflow.Step{ { ID: "Set_Status", Target: "$status", Value: "active", Next: "Branch_Check", }, { ID: "Branch_Check", Cases: [][]string{ {"=$status == \"active\"", "Set_Active"}, {"ELSE", "Set_Other"}, }, Next: "Stop_End", }, { ID: "Set_Active", Target: "$result", Value: "was_active", Next: "Stop_End", }, { ID: "Set_Other", Target: "$result", Value: "was_other", Next: "Stop_End", }, {ID: "Stop_End"}, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("failed to create engine: %v", err) } res, err := engine.Execute(context.Background(), map[string]interface{}{}, createTestAdapters()) if err != nil { t.Fatalf("failed to execute: %v", err) } drainEvents(res.RunEventStream) if res.Context.Variables["$result"] != "was_active" { t.Errorf("expected 'was_active', got %v", res.Context.Variables["$result"]) } }) t.Run("BranchCaseBackwardCompatWithoutEqualsPrefix", func(t *testing.T) { // Ensure old-style cases (no = prefix) still work wf := &workflow.Workflow{ Version: "3.6", Name: "TestBranchBackwardCompat", Registry: workflow.Registry{ Vars: []string{"$score(INT)", "$result(STRING)"}, }, Steps: []workflow.Step{ { ID: "Set_Score", Target: "$score", Value: "=90", Next: "Branch_Check", }, { ID: "Branch_Check", Cases: [][]string{ {"$score >= 90", "Set_High"}, {"ELSE", "Set_Low"}, }, Next: "Stop_End", }, { ID: "Set_High", Target: "$result", Value: "high", Next: "Stop_End", }, { ID: "Set_Low", Target: "$result", Value: "low", Next: "Stop_End", }, {ID: "Stop_End"}, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("failed to create engine: %v", err) } res, err := engine.Execute(context.Background(), map[string]interface{}{}, createTestAdapters()) if err != nil { t.Fatalf("failed to execute: %v", err) } drainEvents(res.RunEventStream) if res.Context.Variables["$result"] != "high" { t.Errorf("expected 'high', got %v", res.Context.Variables["$result"]) } }) t.Run("StepIfWithEqualsPrefix", func(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "TestStepIfEqualsPrefix", Registry: workflow.Registry{ Vars: []string{"$flag(STRING)", "$result(STRING)"}, }, Steps: []workflow.Step{ { ID: "Set_Flag", Target: "$flag", Value: "yes", Next: "Set_Conditional", }, { ID: "Set_Conditional", If: "=$flag == \"yes\"", Target: "$result", Value: "ran", Next: "Stop_End", }, {ID: "Stop_End"}, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("failed to create engine: %v", err) } res, err := engine.Execute(context.Background(), map[string]interface{}{}, createTestAdapters()) if err != nil { t.Fatalf("failed to execute: %v", err) } drainEvents(res.RunEventStream) if res.Context.Variables["$result"] != "ran" { t.Errorf("expected 'ran', got %v", res.Context.Variables["$result"]) } }) t.Run("StepIfWithEqualsPrefixSkipsStep", func(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "TestStepIfEqualsPrefixSkip", Registry: workflow.Registry{ Vars: []string{"$flag(STRING)", "$result(STRING)"}, }, Steps: []workflow.Step{ { ID: "Set_Flag", Target: "$flag", Value: "no", Next: "Set_Conditional", }, { ID: "Set_Conditional", If: "=$flag == \"yes\"", Target: "$result", Value: "ran", Next: "Stop_End", }, {ID: "Stop_End"}, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("failed to create engine: %v", err) } res, err := engine.Execute(context.Background(), map[string]interface{}{}, createTestAdapters()) if err != nil { t.Fatalf("failed to execute: %v", err) } drainEvents(res.RunEventStream) if res.Context.Variables["$result"] != nil { t.Errorf("expected $result to be unset, got %v", res.Context.Variables["$result"]) } }) t.Run("StepIfBackwardCompatWithoutEqualsPrefix", func(t *testing.T) { wf := &workflow.Workflow{ Version: "3.6", Name: "TestStepIfBackwardCompat", Registry: workflow.Registry{ Vars: []string{"$count(INT)", "$result(STRING)"}, }, Steps: []workflow.Step{ { ID: "Set_Count", Target: "$count", Value: "=5", Next: "Set_Conditional", }, { ID: "Set_Conditional", If: "$count > 3", Target: "$result", Value: "above_threshold", Next: "Stop_End", }, {ID: "Stop_End"}, }, } engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("failed to create engine: %v", err) } res, err := engine.Execute(context.Background(), map[string]interface{}{}, createTestAdapters()) if err != nil { t.Fatalf("failed to execute: %v", err) } drainEvents(res.RunEventStream) if res.Context.Variables["$result"] != "above_threshold" { t.Errorf("expected 'above_threshold', got %v", res.Context.Variables["$result"]) } }) }