package workflow_test import ( "context" "fmt" "testing" "workflow" ) // TestV310TextOutputResultIsString verifies that in v3.10, _result for text output // is a string (content only), not the full result map func TestV310TextOutputResultIsString(t *testing.T) { wf := &workflow.Workflow{ Version: "3.10", Name: "V310 Text Output Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{"$content(STRING)", "$tokens(NUMBER)"}, Files: workflow.FilesRegistry{}, }, Steps: []workflow.Step{ { ID: "LLM_Generate", In: workflow.StepInput{ "messages": []interface{}{ map[string]interface{}{ "role": "user", "content": "Hello", }, }, }, Out: workflow.StepOutput{ "$content": "=_result", "$tokens": "=_meta.usage.total_tokens", }, Next: "Stop_End", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() llmAdapter := adapters.LLM.(*workflow.DefaultLLMAdapter) llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { return map[string]interface{}{ "content": "Hello! How can I help?", "model": "gpt-4", "finish_reason": "stop", "response_id": "chatcmpl-123", "usage": map[string]interface{}{ "prompt_tokens": 10, "completion_tokens": 8, "total_tokens": 18, }, }, nil }) engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } drainEvents(result.RunEventStream) // v3.10: _result should be the content string, not the full map content := result.Context.Variables["$content"] contentStr, ok := content.(string) if !ok { t.Fatalf("Expected $content to be string, got %T: %v", content, content) } if contentStr != "Hello! How can I help?" { t.Errorf("Expected content 'Hello! How can I help?', got '%s'", contentStr) } // _meta.usage.total_tokens should be accessible tokens := result.Context.Variables["$tokens"] if tokens == nil { t.Fatal("Expected $tokens to be set from _meta.usage.total_tokens") } if fmt.Sprintf("%v", tokens) != "18" { t.Errorf("Expected tokens 18, got %v", tokens) } } // TestV310StructuredOutputResultIsObject verifies that in v3.10, _result for // json_schema output is the parsed JSON object (same as v3.9 for structured) func TestV310StructuredOutputResultIsObject(t *testing.T) { wf := &workflow.Workflow{ Version: "3.10", Name: "V310 Structured Output Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{"$spec(OBJECT)", "$title(STRING)"}, Files: workflow.FilesRegistry{}, }, Steps: []workflow.Step{ { ID: "LLM_GenSpec", In: workflow.StepInput{ "messages": []interface{}{ map[string]interface{}{ "role": "user", "content": "Generate a spec", }, }, "output_config": map[string]interface{}{ "format": map[string]interface{}{ "type": "json_schema", "schema": map[string]interface{}{ "type": "object", "properties": map[string]interface{}{ "title": map[string]interface{}{"type": "string"}, "score": map[string]interface{}{"type": "number"}, }, }, }, }, }, Out: workflow.StepOutput{ "$spec": "=_result", "$title": "=_result.title", }, Next: "Stop_End", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() llmAdapter := adapters.LLM.(*workflow.DefaultLLMAdapter) llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { // Adapter returns parsed JSON content (json_schema mode parses in adapter) return map[string]interface{}{ "content": map[string]interface{}{ "title": "My Spec", "score": 95, }, "model": "gpt-4", "finish_reason": "stop", "response_id": "chatcmpl-456", "usage": map[string]interface{}{ "prompt_tokens": 20, "completion_tokens": 15, "total_tokens": 35, }, }, nil }) engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } drainEvents(result.RunEventStream) // _result should be the parsed object spec, ok := result.Context.Variables["$spec"].(map[string]interface{}) if !ok { t.Fatalf("Expected $spec to be map, got %T: %v", result.Context.Variables["$spec"], result.Context.Variables["$spec"]) } if spec["title"] != "My Spec" { t.Errorf("Expected title 'My Spec', got '%v'", spec["title"]) } // _result.title should also work title := result.Context.Variables["$title"] if title != "My Spec" { t.Errorf("Expected $title 'My Spec', got '%v'", title) } } // TestV310MetaFields verifies all _meta fields are populated correctly func TestV310MetaFields(t *testing.T) { var capturedMeta interface{} wf := &workflow.Workflow{ Version: "3.10", Name: "V310 Meta Fields Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{"$meta(OBJECT)"}, Files: workflow.FilesRegistry{}, }, Steps: []workflow.Step{ { ID: "LLM_Test", In: workflow.StepInput{ "messages": []interface{}{ map[string]interface{}{ "role": "user", "content": "Test", }, }, }, Out: workflow.StepOutput{ "$meta": "=_meta", }, Next: "Stop_End", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() llmAdapter := adapters.LLM.(*workflow.DefaultLLMAdapter) llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { return map[string]interface{}{ "content": "response", "model": "claude-3-opus", "finish_reason": "stop", "response_id": "msg_abc123", "usage": map[string]interface{}{ "prompt_tokens": 100, "completion_tokens": 50, "total_tokens": 150, }, }, nil }) engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } drainEvents(result.RunEventStream) capturedMeta = result.Context.Variables["$meta"] meta, ok := capturedMeta.(map[string]interface{}) if !ok { t.Fatalf("Expected $meta to be map, got %T", capturedMeta) } // Check model if meta["model"] != "claude-3-opus" { t.Errorf("Expected model 'claude-3-opus', got '%v'", meta["model"]) } // Check provider inference if meta["provider"] != "anthropic" { t.Errorf("Expected provider 'anthropic', got '%v'", meta["provider"]) } // Check finish_reason if meta["finish_reason"] != "stop" { t.Errorf("Expected finish_reason 'stop', got '%v'", meta["finish_reason"]) } // Check response_id if meta["response_id"] != "msg_abc123" { t.Errorf("Expected response_id 'msg_abc123', got '%v'", meta["response_id"]) } // Check latency_ms exists and is > 0 latencyMs, ok := meta["latency_ms"].(int64) if !ok { t.Errorf("Expected latency_ms to be int64, got %T", meta["latency_ms"]) } else if latencyMs < 0 { t.Errorf("Expected latency_ms >= 0, got %d", latencyMs) } // Check usage with normalized field names usage, ok := meta["usage"].(map[string]interface{}) if !ok { t.Fatalf("Expected usage to be map, got %T", meta["usage"]) } if usage["input_tokens"] != 100 { t.Errorf("Expected input_tokens 100, got %v", usage["input_tokens"]) } if usage["output_tokens"] != 50 { t.Errorf("Expected output_tokens 50, got %v", usage["output_tokens"]) } if usage["total_tokens"] != 150 { t.Errorf("Expected total_tokens 150, got %v", usage["total_tokens"]) } // Check raw usage is preserved if usage["raw"] == nil { t.Error("Expected usage.raw to be set") } } // TestV310OnErrorJumpsToHandler verifies that when an LLM call fails and the step // has onError set, the engine jumps to the error handler step func TestV310OnErrorJumpsToHandler(t *testing.T) { wf := &workflow.Workflow{ Version: "3.10", Name: "V310 OnError Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{"$lastError(OBJECT)", "$success(STRING)"}, Files: workflow.FilesRegistry{}, }, Steps: []workflow.Step{ { ID: "LLM_WillFail", In: workflow.StepInput{ "messages": []interface{}{ map[string]interface{}{ "role": "user", "content": "This will fail", }, }, }, Out: workflow.StepOutput{ "$success": "=_result", }, OnError: "Set_LogError", Next: "Stop_End", }, { ID: "Set_LogError", Target: "$lastError", Value: "=_error", Next: "Stop_End", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() llmAdapter := adapters.LLM.(*workflow.DefaultLLMAdapter) llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { return nil, &workflow.LLMError{ Type: "rate_limit", Code: "RATE_LIMIT", Message: "Rate limit exceeded", Retryable: true, StatusCode: 429, Provider: "openai", Model: "gpt-4", } }) engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } drainEvents(result.RunEventStream) // Workflow should NOT have failed - onError handled it if result.Context.Status == workflow.StatusFailed { t.Fatal("Expected workflow to succeed (onError should handle the error)") } // $success should NOT be set (out mapping is skipped on error) if result.Context.Variables["$success"] != nil { t.Errorf("Expected $success to be nil (out mapping not applied on error), got %v", result.Context.Variables["$success"]) } // $lastError should be set by the error handler lastError, ok := result.Context.Variables["$lastError"].(map[string]interface{}) if !ok { t.Fatalf("Expected $lastError to be map, got %T: %v", result.Context.Variables["$lastError"], result.Context.Variables["$lastError"]) } if lastError["type"] != "rate_limit" { t.Errorf("Expected error type 'rate_limit', got '%v'", lastError["type"]) } if lastError["code"] != "RATE_LIMIT" { t.Errorf("Expected error code 'RATE_LIMIT', got '%v'", lastError["code"]) } if lastError["message"] != "Rate limit exceeded" { t.Errorf("Expected error message 'Rate limit exceeded', got '%v'", lastError["message"]) } if lastError["retryable"] != true { t.Errorf("Expected error retryable true, got %v", lastError["retryable"]) } if lastError["status_code"] != 429 { t.Errorf("Expected status_code 429, got %v", lastError["status_code"]) } } // TestV310ErrorWithoutOnErrorFails verifies that in v3.10, when an LLM call fails // and the step has no onError, the workflow still fails (backward-compatible) func TestV310ErrorWithoutOnErrorFails(t *testing.T) { wf := &workflow.Workflow{ Version: "3.10", Name: "V310 Error Without OnError Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{"$content(STRING)"}, Files: workflow.FilesRegistry{}, }, Steps: []workflow.Step{ { ID: "LLM_WillFail", In: workflow.StepInput{ "messages": []interface{}{ map[string]interface{}{ "role": "user", "content": "This will fail", }, }, }, Out: workflow.StepOutput{ "$content": "=_result", }, // No OnError - error should propagate Next: "Stop_End", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() llmAdapter := adapters.LLM.(*workflow.DefaultLLMAdapter) llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { return nil, fmt.Errorf("connection refused") }) engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } drainEvents(result.RunEventStream) // Workflow should have failed if result.Context.Status != workflow.StatusFailed { t.Errorf("Expected workflow to fail, got status %s", result.Context.Status) } } // TestV310BackwardCompatV39 verifies that a v3.9 workflow still works with old _result semantics func TestV310BackwardCompatV39(t *testing.T) { wf := &workflow.Workflow{ Version: "3.9", Name: "V39 Backward Compat Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{"$content(STRING)", "$model(STRING)", "$tokens(NUMBER)"}, Files: workflow.FilesRegistry{}, }, Steps: []workflow.Step{ { ID: "LLM_Generate", In: workflow.StepInput{ "messages": []interface{}{ map[string]interface{}{ "role": "user", "content": "Hello", }, }, }, Out: workflow.StepOutput{ // v3.9 semantics: _result is the full map for text output "$content": "=_result.content", "$model": "=_result.model", "$tokens": "=_result.usage.total_tokens", }, Next: "Stop_End", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() llmAdapter := adapters.LLM.(*workflow.DefaultLLMAdapter) llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { return map[string]interface{}{ "content": "Hello there!", "model": "gpt-4", "finish_reason": "stop", "usage": map[string]interface{}{ "prompt_tokens": 5, "completion_tokens": 3, "total_tokens": 8, }, }, nil }) engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } drainEvents(result.RunEventStream) // v3.9: _result.content should work (full map semantics) if result.Context.Variables["$content"] != "Hello there!" { t.Errorf("Expected content 'Hello there!', got '%v'", result.Context.Variables["$content"]) } if result.Context.Variables["$model"] != "gpt-4" { t.Errorf("Expected model 'gpt-4', got '%v'", result.Context.Variables["$model"]) } if fmt.Sprintf("%v", result.Context.Variables["$tokens"]) != "8" { t.Errorf("Expected tokens 8, got %v", result.Context.Variables["$tokens"]) } } // TestV310ErrorMetaPartiallyPopulated verifies that _meta is set (with latency) // even on LLM failure when onError is used func TestV310ErrorMetaPartiallyPopulated(t *testing.T) { wf := &workflow.Workflow{ Version: "3.10", Name: "V310 Error Meta Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{"$errorMeta(OBJECT)", "$errorInfo(OBJECT)"}, Files: workflow.FilesRegistry{}, }, Steps: []workflow.Step{ { ID: "LLM_WillFail", In: workflow.StepInput{ "messages": []interface{}{ map[string]interface{}{ "role": "user", "content": "This will fail", }, }, }, OnError: "Set_CaptureError", Next: "Stop_End", }, { ID: "Set_CaptureError", Target: "$errorInfo", Value: "=_error", Next: "Stop_End", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() llmAdapter := adapters.LLM.(*workflow.DefaultLLMAdapter) llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { return nil, &workflow.LLMError{ Type: "timeout", Code: "TIMEOUT", Message: "Request timed out", Retryable: true, StatusCode: 408, Model: "gpt-4", } }) engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } drainEvents(result.RunEventStream) // Workflow should succeed (onError handled) if result.Context.Status == workflow.StatusFailed { t.Fatal("Expected workflow to succeed with onError handler") } // $errorInfo should have the error details errorInfo, ok := result.Context.Variables["$errorInfo"].(map[string]interface{}) if !ok { t.Fatalf("Expected $errorInfo to be map, got %T: %v", result.Context.Variables["$errorInfo"], result.Context.Variables["$errorInfo"]) } if errorInfo["type"] != "timeout" { t.Errorf("Expected error type 'timeout', got '%v'", errorInfo["type"]) } if errorInfo["retryable"] != true { t.Errorf("Expected retryable true, got %v", errorInfo["retryable"]) } } // TestV310ResultCleanup verifies that _result, _meta, _error are all cleaned up // after out evaluation func TestV310ResultCleanup(t *testing.T) { wf := &workflow.Workflow{ Version: "3.10", Name: "V310 Cleanup Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{"$content(STRING)"}, Files: workflow.FilesRegistry{}, }, Steps: []workflow.Step{ { ID: "LLM_First", In: workflow.StepInput{ "messages": []interface{}{ map[string]interface{}{ "role": "user", "content": "Hello", }, }, }, Out: workflow.StepOutput{ "$content": "=_result", }, Next: "Stop_End", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() llmAdapter := adapters.LLM.(*workflow.DefaultLLMAdapter) llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { return map[string]interface{}{ "content": "response", "model": "gpt-4", "finish_reason": "stop", }, nil }) engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute workflow: %v", err) } drainEvents(result.RunEventStream) // After workflow completes, local vars should be cleaned up if _, ok := result.Context.LocalVars["_result"]; ok { t.Error("Expected _result to be cleaned up after step execution") } if _, ok := result.Context.LocalVars["_meta"]; ok { t.Error("Expected _meta to be cleaned up after step execution") } if _, ok := result.Context.LocalVars["_error"]; ok { t.Error("Expected _error to be cleaned up after step execution") } } // TestV310OnErrorValidation verifies that onError referencing a non-existent step fails validation func TestV310OnErrorValidation(t *testing.T) { wf := &workflow.Workflow{ Version: "3.10", Name: "V310 OnError Validation Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{}, Files: workflow.FilesRegistry{}, }, Steps: []workflow.Step{ { ID: "LLM_Test", In: workflow.StepInput{ "messages": []interface{}{}, }, OnError: "NonExistent_Step", Next: "Stop_End", }, { ID: "Stop_End", }, }, } _, err := workflow.NewEngine(wf) if err == nil { t.Fatal("Expected validation error for non-existent onError step") } t.Logf("Got expected error: %v", err) } // TestV310ProviderInference verifies provider inference from model names func TestV310ProviderInference(t *testing.T) { tests := []struct { model string expected string }{ {"gpt-4", "openai"}, {"claude-3-opus", "anthropic"}, {"gemini-pro", "google"}, {"mistral-large", "mistral"}, {"some-custom-model", "unknown"}, } for _, tt := range tests { t.Run(tt.model, func(t *testing.T) { wf := &workflow.Workflow{ Version: "3.10", Name: "Provider Test", Registry: workflow.Registry{ Services: []string{}, Components: []string{}, Vars: []string{"$provider(STRING)"}, Files: workflow.FilesRegistry{}, }, Steps: []workflow.Step{ { ID: "LLM_Test", In: workflow.StepInput{ "messages": []interface{}{ map[string]interface{}{ "role": "user", "content": "test", }, }, }, Out: workflow.StepOutput{ "$provider": "=_meta.provider", }, Next: "Stop_End", }, { ID: "Stop_End", }, }, } adapters := createTestAdapters() llmAdapter := adapters.LLM.(*workflow.DefaultLLMAdapter) llmAdapter.SetHandler(func(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { return map[string]interface{}{ "content": "ok", "model": tt.model, }, nil }) engine, err := workflow.NewEngine(wf) if err != nil { t.Fatalf("Failed to create engine: %v", err) } result, err := engine.Execute(context.Background(), map[string]interface{}{}, adapters) if err != nil { t.Fatalf("Failed to execute: %v", err) } drainEvents(result.RunEventStream) provider := result.Context.Variables["$provider"] if provider != tt.expected { t.Errorf("Expected provider '%s' for model '%s', got '%v'", tt.expected, tt.model, provider) } }) } }