| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555 |
- package workflow
- import (
- "context"
- "encoding/json"
- "fmt"
- "net/http"
- "net/http/httptest"
- "strings"
- "testing"
- )
- // ---------- helpers ----------
- // anthropicJSONResponse returns a valid non-streaming Anthropic Messages API response body.
- func anthropicJSONResponse(id, model, text, stopReason string, inputTok, outputTok int) string {
- return fmt.Sprintf(`{
- "id": %q,
- "type": "message",
- "role": "assistant",
- "model": %q,
- "content": [{"type":"text","text":%q}],
- "stop_reason": %q,
- "usage": {"input_tokens": %d, "output_tokens": %d}
- }`, id, model, text, stopReason, inputTok, outputTok)
- }
- // anthropicSSEStream builds an Anthropic-format SSE stream from the given text chunks.
- func anthropicSSEStream(id, model string, chunks []string, stopReason string, inputTok, outputTok int) string {
- var sb strings.Builder
- // message_start
- sb.WriteString(fmt.Sprintf("event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":%q,\"type\":\"message\",\"role\":\"assistant\",\"model\":%q,\"content\":[],\"stop_reason\":null,\"usage\":{\"input_tokens\":%d,\"output_tokens\":0}}}\n\n", id, model, inputTok))
- // content_block_start
- sb.WriteString("event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n")
- // content_block_delta for each chunk
- for _, chunk := range chunks {
- escaped, _ := json.Marshal(chunk) // properly escape the string
- sb.WriteString(fmt.Sprintf("event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":%s}}\n\n", string(escaped)))
- }
- // content_block_stop
- sb.WriteString("event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":0}\n\n")
- // message_delta
- sb.WriteString(fmt.Sprintf("event: message_delta\ndata: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":%q},\"usage\":{\"output_tokens\":%d}}\n\n", stopReason, outputTok))
- // message_stop
- sb.WriteString("event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n")
- return sb.String()
- }
- // ---------- non-streaming tests ----------
- func TestAnthropicAdapter_NonStreaming_Basic(t *testing.T) {
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- // Verify request headers
- if r.Header.Get("x-api-key") != "test-key" {
- t.Errorf("expected x-api-key 'test-key', got %q", r.Header.Get("x-api-key"))
- }
- if r.Header.Get("anthropic-version") != "2023-06-01" {
- t.Errorf("expected anthropic-version '2023-06-01', got %q", r.Header.Get("anthropic-version"))
- }
- if r.Header.Get("Content-Type") != "application/json" {
- t.Errorf("expected Content-Type 'application/json', got %q", r.Header.Get("Content-Type"))
- }
- // Verify request body
- var req anthropicReq
- if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
- t.Fatalf("failed to decode request: %v", err)
- }
- if req.Model != "claude-test" {
- t.Errorf("expected model 'claude-test', got %q", req.Model)
- }
- if req.Stream {
- t.Error("expected stream=false for non-streaming call")
- }
- w.Header().Set("Content-Type", "application/json")
- fmt.Fprint(w, anthropicJSONResponse("msg_123", "claude-test", "Hello world", "end_turn", 10, 5))
- }))
- defer server.Close()
- adapter := NewAnthropicAdapter(AnthropicConfig{
- APIKey: "test-key",
- Model: "claude-test",
- BaseURL: server.URL,
- })
- result, err := adapter.Call(context.Background(), map[string]interface{}{
- "messages": []interface{}{
- map[string]interface{}{"role": "user", "content": "hi"},
- },
- }, nil)
- if err != nil {
- t.Fatalf("Call() error: %v", err)
- }
- if result["content"] != "Hello world" {
- t.Errorf("content: got %q, want 'Hello world'", result["content"])
- }
- if result["model"] != "claude-test" {
- t.Errorf("model: got %q, want 'claude-test'", result["model"])
- }
- if result["finish_reason"] != "end_turn" {
- t.Errorf("finish_reason: got %q, want 'end_turn'", result["finish_reason"])
- }
- if result["response_id"] != "msg_123" {
- t.Errorf("response_id: got %q, want 'msg_123'", result["response_id"])
- }
- usage, ok := result["usage"].(map[string]interface{})
- if !ok {
- t.Fatal("missing usage")
- }
- if usage["prompt_tokens"] != 10 {
- t.Errorf("prompt_tokens: got %v, want 10", usage["prompt_tokens"])
- }
- if usage["completion_tokens"] != 5 {
- t.Errorf("completion_tokens: got %v, want 5", usage["completion_tokens"])
- }
- if usage["total_tokens"] != 15 {
- t.Errorf("total_tokens: got %v, want 15", usage["total_tokens"])
- }
- }
- func TestAnthropicAdapter_NonStreaming_SystemPrompt(t *testing.T) {
- var captured anthropicReq
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- json.NewDecoder(r.Body).Decode(&captured)
- w.Header().Set("Content-Type", "application/json")
- fmt.Fprint(w, anthropicJSONResponse("msg_1", "claude-test", "ok", "end_turn", 5, 2))
- }))
- defer server.Close()
- adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL})
- _, err := adapter.Call(context.Background(), map[string]interface{}{
- "messages": []interface{}{
- map[string]interface{}{"role": "system", "content": "Be concise"},
- map[string]interface{}{"role": "user", "content": "hi"},
- },
- }, nil)
- if err != nil {
- t.Fatalf("Call() error: %v", err)
- }
- if captured.System != "Be concise" {
- t.Errorf("system: got %q, want 'Be concise'", captured.System)
- }
- if len(captured.Messages) != 1 {
- t.Fatalf("expected 1 message (user only), got %d", len(captured.Messages))
- }
- if captured.Messages[0].Role != "user" {
- t.Errorf("expected role 'user', got %q", captured.Messages[0].Role)
- }
- }
- func TestAnthropicAdapter_NonStreaming_PromptParam(t *testing.T) {
- var captured anthropicReq
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- json.NewDecoder(r.Body).Decode(&captured)
- w.Header().Set("Content-Type", "application/json")
- fmt.Fprint(w, anthropicJSONResponse("msg_1", "claude-test", "ok", "end_turn", 5, 2))
- }))
- defer server.Close()
- adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL})
- _, err := adapter.Call(context.Background(), map[string]interface{}{
- "prompt": "hello there",
- }, nil)
- if err != nil {
- t.Fatalf("Call() error: %v", err)
- }
- if len(captured.Messages) != 1 || captured.Messages[0].Content != "hello there" {
- t.Errorf("expected single user message 'hello there', got %+v", captured.Messages)
- }
- }
- func TestAnthropicAdapter_NonStreaming_ModelOverride(t *testing.T) {
- var captured anthropicReq
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- json.NewDecoder(r.Body).Decode(&captured)
- w.Header().Set("Content-Type", "application/json")
- fmt.Fprint(w, anthropicJSONResponse("msg_1", "claude-3-haiku", "ok", "end_turn", 5, 2))
- }))
- defer server.Close()
- adapter := NewAnthropicAdapter(AnthropicConfig{
- APIKey: "k",
- Model: "claude-default",
- BaseURL: server.URL,
- })
- _, err := adapter.Call(context.Background(), map[string]interface{}{
- "model": "claude-3-haiku",
- "prompt": "hi",
- }, nil)
- if err != nil {
- t.Fatalf("Call() error: %v", err)
- }
- if captured.Model != "claude-3-haiku" {
- t.Errorf("expected model override 'claude-3-haiku', got %q", captured.Model)
- }
- }
- func TestAnthropicAdapter_NonStreaming_APIError(t *testing.T) {
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- w.WriteHeader(http.StatusUnauthorized)
- fmt.Fprint(w, `{"error":{"type":"authentication_error","message":"invalid x-api-key"}}`)
- }))
- defer server.Close()
- adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "bad", BaseURL: server.URL})
- _, err := adapter.Call(context.Background(), map[string]interface{}{
- "prompt": "hi",
- }, nil)
- if err == nil {
- t.Fatal("expected error for 401 response")
- }
- llmErr, ok := err.(*LLMError)
- if !ok {
- t.Fatalf("expected *LLMError, got %T: %v", err, err)
- }
- if llmErr.StatusCode != 401 {
- t.Errorf("status code: got %d, want 401", llmErr.StatusCode)
- }
- if llmErr.Retryable {
- t.Error("expected 401 to be non-retryable")
- }
- }
- func TestAnthropicAdapter_NonStreaming_RateLimitRetryable(t *testing.T) {
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- w.WriteHeader(http.StatusTooManyRequests)
- fmt.Fprint(w, `{"error":{"type":"rate_limit_error","message":"too many requests"}}`)
- }))
- defer server.Close()
- adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL})
- _, err := adapter.Call(context.Background(), map[string]interface{}{
- "prompt": "hi",
- }, nil)
- if err == nil {
- t.Fatal("expected error for 429 response")
- }
- llmErr, ok := err.(*LLMError)
- if !ok {
- t.Fatalf("expected *LLMError, got %T", err)
- }
- if !llmErr.Retryable {
- t.Error("expected 429 to be retryable")
- }
- }
- func TestAnthropicAdapter_NonStreaming_MissingMessages(t *testing.T) {
- adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k"})
- _, err := adapter.Call(context.Background(), map[string]interface{}{}, nil)
- if err == nil {
- t.Fatal("expected error for missing messages/prompt")
- }
- if !strings.Contains(err.Error(), "must include 'messages' or 'prompt'") {
- t.Errorf("unexpected error: %v", err)
- }
- }
- // ---------- streaming tests ----------
- func TestAnthropicAdapter_Streaming_Basic(t *testing.T) {
- chunks := []string{"He", "llo", " wo", "rld", "!"}
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- // Verify stream=true in request
- var req anthropicReq
- json.NewDecoder(r.Body).Decode(&req)
- if !req.Stream {
- t.Error("expected stream=true in request body")
- }
- if r.Header.Get("Accept") != "text/event-stream" {
- t.Errorf("expected Accept 'text/event-stream', got %q", r.Header.Get("Accept"))
- }
- w.Header().Set("Content-Type", "text/event-stream")
- w.WriteHeader(http.StatusOK)
- fmt.Fprint(w, anthropicSSEStream("msg_s1", "claude-test", chunks, "end_turn", 12, 8))
- }))
- defer server.Close()
- adapter := NewAnthropicAdapter(AnthropicConfig{
- APIKey: "test-key",
- Model: "claude-test",
- BaseURL: server.URL,
- })
- streamCh := make(chan string, 100)
- result, err := adapter.Call(context.Background(), map[string]interface{}{
- "stream": true,
- "messages": []interface{}{
- map[string]interface{}{"role": "user", "content": "say hello world"},
- },
- }, streamCh)
- if err != nil {
- t.Fatalf("Call() error: %v", err)
- }
- // Collect streamed chunks
- close(streamCh)
- var received []string
- for c := range streamCh {
- received = append(received, c)
- }
- if len(received) != len(chunks) {
- t.Errorf("expected %d stream chunks, got %d", len(chunks), len(received))
- }
- for i, want := range chunks {
- if i < len(received) && received[i] != want {
- t.Errorf("chunk[%d]: got %q, want %q", i, received[i], want)
- }
- }
- // Verify assembled result
- if result["content"] != "Hello world!" {
- t.Errorf("content: got %q, want 'Hello world!'", result["content"])
- }
- if result["model"] != "claude-test" {
- t.Errorf("model: got %q, want 'claude-test'", result["model"])
- }
- if result["finish_reason"] != "end_turn" {
- t.Errorf("finish_reason: got %q, want 'end_turn'", result["finish_reason"])
- }
- if result["response_id"] != "msg_s1" {
- t.Errorf("response_id: got %q, want 'msg_s1'", result["response_id"])
- }
- usage := result["usage"].(map[string]interface{})
- if usage["prompt_tokens"] != 12 {
- t.Errorf("prompt_tokens: got %v, want 12", usage["prompt_tokens"])
- }
- if usage["completion_tokens"] != 8 {
- t.Errorf("completion_tokens: got %v, want 8", usage["completion_tokens"])
- }
- if usage["total_tokens"] != 20 {
- t.Errorf("total_tokens: got %v, want 20", usage["total_tokens"])
- }
- }
- func TestAnthropicAdapter_Streaming_FallsBackWhenNoChannel(t *testing.T) {
- // When stream=true but channel is nil, should fall back to non-streaming
- var captured anthropicReq
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- json.NewDecoder(r.Body).Decode(&captured)
- w.Header().Set("Content-Type", "application/json")
- fmt.Fprint(w, anthropicJSONResponse("msg_1", "claude-test", "ok", "end_turn", 5, 2))
- }))
- defer server.Close()
- adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL})
- result, err := adapter.Call(context.Background(), map[string]interface{}{
- "stream": true,
- "prompt": "hi",
- }, nil) // nil channel
- if err != nil {
- t.Fatalf("Call() error: %v", err)
- }
- if captured.Stream {
- t.Error("expected stream=false when channel is nil")
- }
- if result["content"] != "ok" {
- t.Errorf("content: got %v, want 'ok'", result["content"])
- }
- }
- func TestAnthropicAdapter_Streaming_HTTPError(t *testing.T) {
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- w.WriteHeader(http.StatusInternalServerError)
- fmt.Fprint(w, `{"error":{"type":"api_error","message":"internal server error"}}`)
- }))
- defer server.Close()
- adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL})
- streamCh := make(chan string, 100)
- _, err := adapter.Call(context.Background(), map[string]interface{}{
- "stream": true,
- "prompt": "hi",
- }, streamCh)
- if err == nil {
- t.Fatal("expected error for 500 response")
- }
- llmErr, ok := err.(*LLMError)
- if !ok {
- t.Fatalf("expected *LLMError, got %T: %v", err, err)
- }
- if !llmErr.Retryable {
- t.Error("expected 500 to be retryable")
- }
- }
- func TestAnthropicAdapter_Streaming_ErrorEvent(t *testing.T) {
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Type", "text/event-stream")
- w.WriteHeader(http.StatusOK)
- // Send a partial stream then error
- fmt.Fprint(w, "event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_1\",\"model\":\"claude-test\",\"content\":[],\"usage\":{\"input_tokens\":5}}}\n\n")
- fmt.Fprint(w, "event: error\ndata: {\"type\":\"error\",\"error\":{\"type\":\"overloaded_error\",\"message\":\"Overloaded\"}}\n\n")
- }))
- defer server.Close()
- adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL})
- streamCh := make(chan string, 100)
- _, err := adapter.Call(context.Background(), map[string]interface{}{
- "stream": true,
- "prompt": "hi",
- }, streamCh)
- if err == nil {
- t.Fatal("expected error from error SSE event")
- }
- if !strings.Contains(err.Error(), "Overloaded") {
- t.Errorf("expected error to mention 'Overloaded', got: %v", err)
- }
- }
- func TestAnthropicAdapter_Streaming_ContextCancellation(t *testing.T) {
- // Simulate a slow stream and cancel the context
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Type", "text/event-stream")
- w.WriteHeader(http.StatusOK)
- // Write message_start and one chunk
- fmt.Fprint(w, "event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_1\",\"model\":\"claude-test\",\"content\":[],\"usage\":{\"input_tokens\":5}}}\n\n")
- fmt.Fprint(w, "event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n")
- fmt.Fprint(w, "event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"He\"}}\n\n")
- // Flush to ensure the client receives data
- if f, ok := w.(http.Flusher); ok {
- f.Flush()
- }
- // Block indefinitely (simulating slow stream) — context cancel will close connection
- <-r.Context().Done()
- }))
- defer server.Close()
- adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL})
- ctx, cancel := context.WithCancel(context.Background())
- // Use a zero-buffer channel so the send blocks when context is cancelled
- streamCh := make(chan string)
- // Read one chunk then cancel
- go func() {
- <-streamCh // read "He"
- cancel()
- }()
- _, err := adapter.Call(ctx, map[string]interface{}{
- "stream": true,
- "prompt": "hi",
- }, streamCh)
- if err == nil {
- t.Fatal("expected context cancellation error")
- }
- }
- // ---------- structured JSON output tests ----------
- func TestAnthropicAdapter_StructuredJSON(t *testing.T) {
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- var req anthropicReq
- json.NewDecoder(r.Body).Decode(&req)
- // Verify schema instruction was injected into system prompt
- if !strings.Contains(req.System, "OUTPUT FORMAT REQUIREMENT") {
- t.Error("expected schema instruction in system prompt")
- }
- if !strings.Contains(req.System, `"name"`) {
- t.Error("expected schema properties in system prompt")
- }
- w.Header().Set("Content-Type", "application/json")
- fmt.Fprint(w, anthropicJSONResponse("msg_1", "claude-test", `{"name":"Alice","age":30}`, "end_turn", 10, 5))
- }))
- defer server.Close()
- adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL})
- result, err := adapter.Call(context.Background(), map[string]interface{}{
- "prompt": "give me a person",
- "output_config": map[string]interface{}{
- "format": map[string]interface{}{
- "type": "json_schema",
- "schema": map[string]interface{}{
- "type": "object",
- "properties": map[string]interface{}{
- "name": map[string]interface{}{"type": "string"},
- "age": map[string]interface{}{"type": "number"},
- },
- },
- },
- },
- }, nil)
- if err != nil {
- t.Fatalf("Call() error: %v", err)
- }
- // Content should be parsed JSON, not a string
- parsed, ok := result["content"].(map[string]interface{})
- if !ok {
- t.Fatalf("expected content to be parsed map, got %T: %v", result["content"], result["content"])
- }
- if parsed["name"] != "Alice" {
- t.Errorf("name: got %v, want 'Alice'", parsed["name"])
- }
- }
- func TestAnthropicAdapter_StructuredJSON_WithCodeFences(t *testing.T) {
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Type", "application/json")
- // LLM wraps in code fences despite instruction
- fmt.Fprint(w, anthropicJSONResponse("msg_1", "claude-test", "```json\n{\"value\":42}\n```", "end_turn", 10, 5))
- }))
- defer server.Close()
- adapter := NewAnthropicAdapter(AnthropicConfig{APIKey: "k", BaseURL: server.URL})
- result, err := adapter.Call(context.Background(), map[string]interface{}{
- "prompt": "give me a number",
- "output_config": map[string]interface{}{
- "format": map[string]interface{}{"type": "json_schema"},
- },
- }, nil)
- if err != nil {
- t.Fatalf("Call() error: %v", err)
- }
- parsed, ok := result["content"].(map[string]interface{})
- if !ok {
- t.Fatalf("expected parsed map, got %T", result["content"])
- }
- if parsed["value"] != float64(42) {
- t.Errorf("value: got %v, want 42", parsed["value"])
- }
- }
|