package workflow import ( "context" "encoding/json" "fmt" "net/http" "net/http/httptest" "strings" "testing" ) // ---------- helpers ---------- // anthropicJSONResponse returns a valid non-streaming Anthropic Messages API response body. func anthropicJSONResponse(id, model, text, stopReason string, inputTok, outputTok int) string { return fmt.Sprintf(`{ "id": %q, "type": "message", "role": "assistant", "model": %q, "content": [{"type":"text","text":%q}], "stop_reason": %q, "usage": {"input_tokens": %d, "output_tokens": %d} }`, id, model, text, stopReason, inputTok, outputTok) } // anthropicSSEStream builds an Anthropic-format SSE stream from the given text chunks. func anthropicSSEStream(id, model string, chunks []string, stopReason string, inputTok, outputTok int) string { var sb strings.Builder // message_start sb.WriteString(fmt.Sprintf("event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":%q,\"type\":\"message\",\"role\":\"assistant\",\"model\":%q,\"content\":[],\"stop_reason\":null,\"usage\":{\"input_tokens\":%d,\"output_tokens\":0}}}\n\n", id, model, inputTok)) // content_block_start sb.WriteString("event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n") // content_block_delta for each chunk for _, chunk := range chunks { escaped, _ := json.Marshal(chunk) // properly escape the string sb.WriteString(fmt.Sprintf("event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":%s}}\n\n", string(escaped))) } // content_block_stop sb.WriteString("event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":0}\n\n") // message_delta sb.WriteString(fmt.Sprintf("event: message_delta\ndata: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":%q},\"usage\":{\"output_tokens\":%d}}\n\n", stopReason, outputTok)) // message_stop sb.WriteString("event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n") return sb.String() } // ---------- non-streaming tests ---------- func TestAnthropicAdapter_NonStreaming_Basic(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Verify request headers if r.Header.Get("x-api-key") != "test-key" { t.Errorf("expected x-api-key 'test-key', got %q", r.Header.Get("x-api-key")) } if r.Header.Get("anthropic-version") != "2023-06-01" { t.Errorf("expected anthropic-version '2023-06-01', got %q", r.Header.Get("anthropic-version")) } if r.Header.Get("Content-Type") != "application/json" { t.Errorf("expected Content-Type 'application/json', got %q", r.Header.Get("Content-Type")) } // Verify request body var req anthropicReq if err := json.NewDecoder(r.Body).Decode(&req); err != nil { t.Fatalf("failed to decode request: %v", err) } if req.Model != "claude-test" { t.Errorf("expected model 'claude-test', got %q", req.Model) } if req.Stream { t.Error("expected stream=false for non-streaming call") } w.Header().Set("Content-Type", "application/json") fmt.Fprint(w, anthropicJSONResponse("msg_123", "claude-test", "Hello world", "end_turn", 10, 5)) })) defer server.Close() adapter := NewAnthropicAdapter(AnthropicConfig{ APIKey: "test-key", Model: "claude-test", BaseURL: server.URL, }) result, err := adapter.Call(context.Background(), map[string]interface{}{ "messages": []interface{}{ map[string]interface{}{"role": "user", "content": "hi"}, }, }, nil) if err != nil { t.Fatalf("Call() error: %v", err) } if result["content"] != "Hello world" { t.Errorf("content: got %q, want 'Hello world'", result["content"]) } if result["model"] != "claude-test" { t.Errorf("model: got %q, want 'claude-test'", result["model"]) } if result["finish_reason"] != "end_turn" { t.Errorf("finish_reason: got %q, want 'end_turn'", result["finish_reason"]) } if result["response_id"] != "msg_123" { t.Errorf("response_id: got %q, want 'msg_123'", result["response_id"]) } usage, ok := result["usage"].(map[string]interface{}) if !ok { t.Fatal("missing usage") } if usage["prompt_tokens"] != 10 { t.Errorf("prompt_tokens: got %v, want 10", usage["prompt_tokens"]) } if usage["completion_tokens"] != 5 { t.Errorf("completion_tokens: got %v, want 5", usage["completion_tokens"]) } if usage["total_tokens"] != 15 { t.Errorf("total_tokens: got %v, want 15", usage["total_tokens"]) } } func TestAnthropicAdapter_NonStreaming_SystemPrompt(t *testing.T) { var captured anthropicReq server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { json.NewDecoder(r.Body).Decode(&captured) w.Header().Set("Content-Type", "application/json") fmt.Fprint(w, anthropicJSONResponse("msg_1", "claude-test", "ok", "end_turn", 5, 2)) })) defer server.Close() adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL}) _, err := adapter.Call(context.Background(), map[string]interface{}{ "messages": []interface{}{ map[string]interface{}{"role": "system", "content": "Be concise"}, map[string]interface{}{"role": "user", "content": "hi"}, }, }, nil) if err != nil { t.Fatalf("Call() error: %v", err) } if captured.System != "Be concise" { t.Errorf("system: got %q, want 'Be concise'", captured.System) } if len(captured.Messages) != 1 { t.Fatalf("expected 1 message (user only), got %d", len(captured.Messages)) } if captured.Messages[0].Role != "user" { t.Errorf("expected role 'user', got %q", captured.Messages[0].Role) } } func TestAnthropicAdapter_NonStreaming_PromptParam(t *testing.T) { var captured anthropicReq server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { json.NewDecoder(r.Body).Decode(&captured) w.Header().Set("Content-Type", "application/json") fmt.Fprint(w, anthropicJSONResponse("msg_1", "claude-test", "ok", "end_turn", 5, 2)) })) defer server.Close() adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL}) _, err := adapter.Call(context.Background(), map[string]interface{}{ "prompt": "hello there", }, nil) if err != nil { t.Fatalf("Call() error: %v", err) } if len(captured.Messages) != 1 || captured.Messages[0].Content != "hello there" { t.Errorf("expected single user message 'hello there', got %+v", captured.Messages) } } func TestAnthropicAdapter_NonStreaming_ModelOverride(t *testing.T) { var captured anthropicReq server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { json.NewDecoder(r.Body).Decode(&captured) w.Header().Set("Content-Type", "application/json") fmt.Fprint(w, anthropicJSONResponse("msg_1", "claude-3-haiku", "ok", "end_turn", 5, 2)) })) defer server.Close() adapter := NewAnthropicAdapter(AnthropicConfig{ APIKey: "k", Model: "claude-default", BaseURL: server.URL, }) _, err := adapter.Call(context.Background(), map[string]interface{}{ "model": "claude-3-haiku", "prompt": "hi", }, nil) if err != nil { t.Fatalf("Call() error: %v", err) } if captured.Model != "claude-3-haiku" { t.Errorf("expected model override 'claude-3-haiku', got %q", captured.Model) } } func TestAnthropicAdapter_NonStreaming_APIError(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusUnauthorized) fmt.Fprint(w, `{"error":{"type":"authentication_error","message":"invalid x-api-key"}}`) })) defer server.Close() adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "bad", BaseURL: server.URL}) _, err := adapter.Call(context.Background(), map[string]interface{}{ "prompt": "hi", }, nil) if err == nil { t.Fatal("expected error for 401 response") } llmErr, ok := err.(*LLMError) if !ok { t.Fatalf("expected *LLMError, got %T: %v", err, err) } if llmErr.StatusCode != 401 { t.Errorf("status code: got %d, want 401", llmErr.StatusCode) } if llmErr.Retryable { t.Error("expected 401 to be non-retryable") } } func TestAnthropicAdapter_NonStreaming_RateLimitRetryable(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusTooManyRequests) fmt.Fprint(w, `{"error":{"type":"rate_limit_error","message":"too many requests"}}`) })) defer server.Close() adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL}) _, err := adapter.Call(context.Background(), map[string]interface{}{ "prompt": "hi", }, nil) if err == nil { t.Fatal("expected error for 429 response") } llmErr, ok := err.(*LLMError) if !ok { t.Fatalf("expected *LLMError, got %T", err) } if !llmErr.Retryable { t.Error("expected 429 to be retryable") } } func TestAnthropicAdapter_NonStreaming_MissingMessages(t *testing.T) { adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k"}) _, err := adapter.Call(context.Background(), map[string]interface{}{}, nil) if err == nil { t.Fatal("expected error for missing messages/prompt") } if !strings.Contains(err.Error(), "must include 'messages' or 'prompt'") { t.Errorf("unexpected error: %v", err) } } // ---------- streaming tests ---------- func TestAnthropicAdapter_Streaming_Basic(t *testing.T) { chunks := []string{"He", "llo", " wo", "rld", "!"} server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Verify stream=true in request var req anthropicReq json.NewDecoder(r.Body).Decode(&req) if !req.Stream { t.Error("expected stream=true in request body") } if r.Header.Get("Accept") != "text/event-stream" { t.Errorf("expected Accept 'text/event-stream', got %q", r.Header.Get("Accept")) } w.Header().Set("Content-Type", "text/event-stream") w.WriteHeader(http.StatusOK) fmt.Fprint(w, anthropicSSEStream("msg_s1", "claude-test", chunks, "end_turn", 12, 8)) })) defer server.Close() adapter := NewAnthropicAdapter(AnthropicConfig{ APIKey: "test-key", Model: "claude-test", BaseURL: server.URL, }) streamCh := make(chan string, 100) result, err := adapter.Call(context.Background(), map[string]interface{}{ "stream": true, "messages": []interface{}{ map[string]interface{}{"role": "user", "content": "say hello world"}, }, }, streamCh) if err != nil { t.Fatalf("Call() error: %v", err) } // Collect streamed chunks close(streamCh) var received []string for c := range streamCh { received = append(received, c) } if len(received) != len(chunks) { t.Errorf("expected %d stream chunks, got %d", len(chunks), len(received)) } for i, want := range chunks { if i < len(received) && received[i] != want { t.Errorf("chunk[%d]: got %q, want %q", i, received[i], want) } } // Verify assembled result if result["content"] != "Hello world!" { t.Errorf("content: got %q, want 'Hello world!'", result["content"]) } if result["model"] != "claude-test" { t.Errorf("model: got %q, want 'claude-test'", result["model"]) } if result["finish_reason"] != "end_turn" { t.Errorf("finish_reason: got %q, want 'end_turn'", result["finish_reason"]) } if result["response_id"] != "msg_s1" { t.Errorf("response_id: got %q, want 'msg_s1'", result["response_id"]) } usage := result["usage"].(map[string]interface{}) if usage["prompt_tokens"] != 12 { t.Errorf("prompt_tokens: got %v, want 12", usage["prompt_tokens"]) } if usage["completion_tokens"] != 8 { t.Errorf("completion_tokens: got %v, want 8", usage["completion_tokens"]) } if usage["total_tokens"] != 20 { t.Errorf("total_tokens: got %v, want 20", usage["total_tokens"]) } } func TestAnthropicAdapter_Streaming_FallsBackWhenNoChannel(t *testing.T) { // When stream=true but channel is nil, should fall back to non-streaming var captured anthropicReq server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { json.NewDecoder(r.Body).Decode(&captured) w.Header().Set("Content-Type", "application/json") fmt.Fprint(w, anthropicJSONResponse("msg_1", "claude-test", "ok", "end_turn", 5, 2)) })) defer server.Close() adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL}) result, err := adapter.Call(context.Background(), map[string]interface{}{ "stream": true, "prompt": "hi", }, nil) // nil channel if err != nil { t.Fatalf("Call() error: %v", err) } if captured.Stream { t.Error("expected stream=false when channel is nil") } if result["content"] != "ok" { t.Errorf("content: got %v, want 'ok'", result["content"]) } } func TestAnthropicAdapter_Streaming_HTTPError(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) fmt.Fprint(w, `{"error":{"type":"api_error","message":"internal server error"}}`) })) defer server.Close() adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL}) streamCh := make(chan string, 100) _, err := adapter.Call(context.Background(), map[string]interface{}{ "stream": true, "prompt": "hi", }, streamCh) if err == nil { t.Fatal("expected error for 500 response") } llmErr, ok := err.(*LLMError) if !ok { t.Fatalf("expected *LLMError, got %T: %v", err, err) } if !llmErr.Retryable { t.Error("expected 500 to be retryable") } } func TestAnthropicAdapter_Streaming_ErrorEvent(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.WriteHeader(http.StatusOK) // Send a partial stream then error fmt.Fprint(w, "event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_1\",\"model\":\"claude-test\",\"content\":[],\"usage\":{\"input_tokens\":5}}}\n\n") fmt.Fprint(w, "event: error\ndata: {\"type\":\"error\",\"error\":{\"type\":\"overloaded_error\",\"message\":\"Overloaded\"}}\n\n") })) defer server.Close() adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL}) streamCh := make(chan string, 100) _, err := adapter.Call(context.Background(), map[string]interface{}{ "stream": true, "prompt": "hi", }, streamCh) if err == nil { t.Fatal("expected error from error SSE event") } if !strings.Contains(err.Error(), "Overloaded") { t.Errorf("expected error to mention 'Overloaded', got: %v", err) } } func TestAnthropicAdapter_Streaming_ContextCancellation(t *testing.T) { // Simulate a slow stream and cancel the context server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.WriteHeader(http.StatusOK) // Write message_start and one chunk fmt.Fprint(w, "event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_1\",\"model\":\"claude-test\",\"content\":[],\"usage\":{\"input_tokens\":5}}}\n\n") fmt.Fprint(w, "event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n") fmt.Fprint(w, "event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"He\"}}\n\n") // Flush to ensure the client receives data if f, ok := w.(http.Flusher); ok { f.Flush() } // Block indefinitely (simulating slow stream) — context cancel will close connection <-r.Context().Done() })) defer server.Close() adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL}) ctx, cancel := context.WithCancel(context.Background()) // Use a zero-buffer channel so the send blocks when context is cancelled streamCh := make(chan string) // Read one chunk then cancel go func() { <-streamCh // read "He" cancel() }() _, err := adapter.Call(ctx, map[string]interface{}{ "stream": true, "prompt": "hi", }, streamCh) if err == nil { t.Fatal("expected context cancellation error") } } // ---------- structured JSON output tests ---------- func TestAnthropicAdapter_StructuredJSON(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var req anthropicReq json.NewDecoder(r.Body).Decode(&req) // Verify schema instruction was injected into system prompt if !strings.Contains(req.System, "OUTPUT FORMAT REQUIREMENT") { t.Error("expected schema instruction in system prompt") } if !strings.Contains(req.System, `"name"`) { t.Error("expected schema properties in system prompt") } w.Header().Set("Content-Type", "application/json") fmt.Fprint(w, anthropicJSONResponse("msg_1", "claude-test", `{"name":"Alice","age":30}`, "end_turn", 10, 5)) })) defer server.Close() adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL}) result, err := adapter.Call(context.Background(), map[string]interface{}{ "prompt": "give me a person", "output_config": map[string]interface{}{ "format": map[string]interface{}{ "type": "json_schema", "schema": map[string]interface{}{ "type": "object", "properties": map[string]interface{}{ "name": map[string]interface{}{"type": "string"}, "age": map[string]interface{}{"type": "number"}, }, }, }, }, }, nil) if err != nil { t.Fatalf("Call() error: %v", err) } // Content should be parsed JSON, not a string parsed, ok := result["content"].(map[string]interface{}) if !ok { t.Fatalf("expected content to be parsed map, got %T: %v", result["content"], result["content"]) } if parsed["name"] != "Alice" { t.Errorf("name: got %v, want 'Alice'", parsed["name"]) } } func TestAnthropicAdapter_StructuredJSON_WithCodeFences(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") // LLM wraps in code fences despite instruction fmt.Fprint(w, anthropicJSONResponse("msg_1", "claude-test", "```json\n{\"value\":42}\n```", "end_turn", 10, 5)) })) defer server.Close() adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL}) result, err := adapter.Call(context.Background(), map[string]interface{}{ "prompt": "give me a number", "output_config": map[string]interface{}{ "format": map[string]interface{}{"type": "json_schema"}, }, }, nil) if err != nil { t.Fatalf("Call() error: %v", err) } parsed, ok := result["content"].(map[string]interface{}) if !ok { t.Fatalf("expected parsed map, got %T", result["content"]) } if parsed["value"] != float64(42) { t.Errorf("value: got %v, want 42", parsed["value"]) } }