package workflow import ( "bufio" "bytes" "context" "encoding/json" "fmt" "io" "net/http" "strings" "time" ) // schemaToInstruction converts a JSON Schema object to a strict system-level instruction. // The strong imperative framing ("enforced by runtime") reduces the model's tendency to // wrap output in markdown fences or add explanatory text. func schemaToInstruction(schema map[string]interface{}) string { const header = "OUTPUT FORMAT REQUIREMENT (enforced by runtime, do not deviate):\n" + "You must output ONLY a raw JSON object or array. No other text is allowed.\n" + "- Start your response with { or [, end with } or ]\n" + "- Do NOT wrap in markdown code fences (no ```json, no ```)\n" + "- Do NOT include any preamble, explanation, or commentary before or after the JSON\n" + "- Use double quotes for all keys and string values\n" + "- Do not include trailing commas\n" + "- The output must be directly parseable by JSON.parse() with zero preprocessing" if schema == nil { return header } schemaJSON, err := json.MarshalIndent(schema, "", " ") if err != nil { return header } return header + "\n\nJSON Schema to conform to:\n" + string(schemaJSON) } // AnthropicAdapter calls the Anthropic Messages API directly (no OpenAI proxy needed). // // Activate automatically in cmd/ when LLM_URL contains "anthropic.com". // // Config (.env): // // LLM_URL=https://api.anthropic.com // LLM_KEY=sk-ant-api03-... // LLM_MODEL=claude-3-5-sonnet-20241022 type AnthropicAdapter struct { baseURL string apiKey string model string client *http.Client } // AnthropicConfig holds configuration for AnthropicAdapter. type AnthropicConfig struct { APIKey string // Anthropic API key (sk-ant-...) Model string // e.g. "claude-3-5-sonnet-20241022" BaseURL string // default: "https://api.anthropic.com" Timeout time.Duration // default: 5 minutes } // NewAnthropicAdapter creates a new AnthropicAdapter. func NewAnthropicAdapter(cfg AnthropicConfig) *AnthropicAdapter { baseURL := strings.TrimSuffix(cfg.BaseURL, "/") if baseURL == "" { baseURL = "https://api.anthropic.com" } model := cfg.Model if model == "" { model = "claude-3-5-sonnet-20241022" } timeout := cfg.Timeout if timeout == 0 { timeout = 5 * time.Minute } return &AnthropicAdapter{ baseURL: baseURL, apiKey: cfg.APIKey, model: model, client: &http.Client{Timeout: timeout}, } } // anthropicMsg is a message in the Anthropic API format. type anthropicMsg struct { Role string `json:"role"` Content string `json:"content"` } // anthropicReq is the POST body for /v1/messages. type anthropicReq struct { Model string `json:"model"` MaxTokens int `json:"max_tokens"` Messages []anthropicMsg `json:"messages"` System string `json:"system,omitempty"` Stream bool `json:"stream,omitempty"` } // anthropicResp is the response from /v1/messages (non-streaming). type anthropicResp struct { ID string `json:"id"` Model string `json:"model"` Content []struct { Type string `json:"type"` Text string `json:"text"` } `json:"content"` StopReason string `json:"stop_reason"` Usage struct { InputTokens int `json:"input_tokens"` OutputTokens int `json:"output_tokens"` } `json:"usage"` Error *struct { Type string `json:"type"` Message string `json:"message"` } `json:"error,omitempty"` } // anthropicCallParams holds parsed parameters for a single Call invocation. type anthropicCallParams struct { model string maxTokens int msgs []anthropicMsg system string shouldParseJSON bool jsonSchema map[string]interface{} } // Call implements LLMAdapter. Supports both streaming (SSE) and non-streaming modes. func (a *AnthropicAdapter) Call(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { p, err := a.parseCallParams(params) if err != nil { return nil, err } // Check if streaming is requested isStreaming := false if streamVal, ok := params["stream"].(bool); ok { isStreaming = streamVal } var result map[string]interface{} if isStreaming && stream != nil { result, err = a.callStreaming(ctx, p, stream) } else { result, err = a.callNonStreaming(ctx, p) } if err != nil { return nil, err } // Parse structured JSON output if requested if p.shouldParseJSON { if err := a.parseStructuredJSON(result); err != nil { return nil, err } } return result, nil } // parseCallParams extracts and validates parameters from the generic params map. func (a *AnthropicAdapter) parseCallParams(params map[string]interface{}) (*anthropicCallParams, error) { p := &anthropicCallParams{ model: a.model, maxTokens: 4096, } if m, ok := params["model"].(string); ok && m != "" { p.model = m } switch v := params["max_tokens"].(type) { case int: p.maxTokens = v case float64: p.maxTokens = int(v) } // Detect structured JSON request from output_config if outputConfig, ok := params["output_config"].(map[string]interface{}); ok { if format, ok := outputConfig["format"].(map[string]interface{}); ok { if ftype, ok := format["type"].(string); ok && ftype == "json_schema" { p.shouldParseJSON = true if s, ok := format["schema"].(map[string]interface{}); ok { p.jsonSchema = s } } } } // Parse messages + system if raw, ok := params["messages"].([]interface{}); ok { for _, item := range raw { m, ok := item.(map[string]interface{}) if !ok { continue } role, _ := m["role"].(string) content, _ := m["content"].(string) if role == "system" { p.system = content } else { p.msgs = append(p.msgs, anthropicMsg{Role: role, Content: content}) } } } else if prompt, ok := params["prompt"].(string); ok { p.msgs = []anthropicMsg{{Role: "user", Content: prompt}} } else { return nil, fmt.Errorf("AnthropicAdapter: params must include 'messages' or 'prompt'") } if len(p.msgs) == 0 { return nil, fmt.Errorf("AnthropicAdapter: no user/assistant messages") } // Inject schema instruction into system prompt if p.shouldParseJSON { instruction := schemaToInstruction(p.jsonSchema) if p.system == "" { p.system = instruction } else { p.system = p.system + "\n\n" + instruction } } return p, nil } // buildHTTPRequest creates the HTTP request for the Anthropic API. func (a *AnthropicAdapter) buildHTTPRequest(ctx context.Context, p *anthropicCallParams, streaming bool) (*http.Request, error) { reqBody := anthropicReq{ Model: p.model, MaxTokens: p.maxTokens, Messages: p.msgs, System: p.system, Stream: streaming, } bodyBytes, err := json.Marshal(reqBody) if err != nil { return nil, fmt.Errorf("AnthropicAdapter: marshal: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, a.baseURL+"/v1/messages", bytes.NewReader(bodyBytes)) if err != nil { return nil, fmt.Errorf("AnthropicAdapter: new request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("x-api-key", a.apiKey) httpReq.Header.Set("anthropic-version", "2023-06-01") if streaming { httpReq.Header.Set("Accept", "text/event-stream") } return httpReq, nil } // callNonStreaming performs a standard (non-streaming) API call. func (a *AnthropicAdapter) callNonStreaming(ctx context.Context, p *anthropicCallParams) (map[string]interface{}, error) { httpReq, err := a.buildHTTPRequest(ctx, p, false) if err != nil { return nil, err } resp, err := a.client.Do(httpReq) if err != nil { return nil, fmt.Errorf("AnthropicAdapter: HTTP: %w", err) } defer resp.Body.Close() respBytes, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("AnthropicAdapter: read body: %w", err) } if resp.StatusCode != http.StatusOK { return nil, parseAnthropicError(resp.StatusCode, respBytes, p.model) } var apiResp anthropicResp if err := json.Unmarshal(respBytes, &apiResp); err != nil { return nil, fmt.Errorf("AnthropicAdapter: parse response: %w", err) } if apiResp.Error != nil { return nil, &LLMError{ Type: apiResp.Error.Type, Message: fmt.Sprintf("AnthropicAdapter: API error: %s", apiResp.Error.Message), Model: apiResp.Model, } } // Extract content text var text strings.Builder for _, block := range apiResp.Content { if block.Type == "text" { text.WriteString(block.Text) } } return map[string]interface{}{ "content": text.String(), "model": apiResp.Model, "finish_reason": apiResp.StopReason, "response_id": apiResp.ID, "usage": map[string]interface{}{ "prompt_tokens": apiResp.Usage.InputTokens, "completion_tokens": apiResp.Usage.OutputTokens, "total_tokens": apiResp.Usage.InputTokens + apiResp.Usage.OutputTokens, }, }, nil } // callStreaming performs a streaming API call using Anthropic's SSE protocol. // // Anthropic SSE event types: // - message_start: contains message metadata (id, model, usage.input_tokens) // - content_block_delta: contains incremental text (delta.text) // - message_delta: contains stop_reason and usage.output_tokens // - message_stop: signals end of stream // - error: API error during streaming // // Channel contract (same as OpenAI adapter): // - Does not close the channel (engine is responsible) // - Uses select for non-blocking send // - All sends complete before Call returns (no goroutines) func (a *AnthropicAdapter) callStreaming(ctx context.Context, p *anthropicCallParams, stream chan<- string) (map[string]interface{}, error) { httpReq, err := a.buildHTTPRequest(ctx, p, true) if err != nil { return nil, err } resp, err := a.client.Do(httpReq) if err != nil { return nil, fmt.Errorf("AnthropicAdapter: HTTP: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { bodyBytes, _ := io.ReadAll(resp.Body) return nil, parseAnthropicError(resp.StatusCode, bodyBytes, p.model) } // Process SSE stream var fullContent strings.Builder var model, responseID, stopReason string var inputTokens, outputTokens int scanner := bufio.NewScanner(resp.Body) scanner.Buffer(make([]byte, 64*1024), 1024*1024) var eventType string for scanner.Scan() { line := scanner.Text() // Track event type from "event:" lines if strings.HasPrefix(line, "event: ") { eventType = strings.TrimPrefix(line, "event: ") continue } // Skip empty lines, comments, non-data lines if line == "" || strings.HasPrefix(line, ":") || !strings.HasPrefix(line, "data: ") { continue } data := strings.TrimPrefix(line, "data: ") switch eventType { case "message_start": // {"type":"message_start","message":{"id":"...","model":"...","usage":{"input_tokens":N}}} var event struct { Message struct { ID string `json:"id"` Model string `json:"model"` Usage struct { InputTokens int `json:"input_tokens"` } `json:"usage"` } `json:"message"` } if json.Unmarshal([]byte(data), &event) == nil { responseID = event.Message.ID model = event.Message.Model inputTokens = event.Message.Usage.InputTokens } case "content_block_delta": // {"type":"content_block_delta","delta":{"type":"text_delta","text":"..."}} var event struct { Delta struct { Text string `json:"text"` } `json:"delta"` } if json.Unmarshal([]byte(data), &event) == nil && event.Delta.Text != "" { fullContent.WriteString(event.Delta.Text) select { case stream <- event.Delta.Text: case <-ctx.Done(): return nil, ctx.Err() } } case "message_delta": // {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":N}} var event struct { Delta struct { StopReason string `json:"stop_reason"` } `json:"delta"` Usage struct { OutputTokens int `json:"output_tokens"` } `json:"usage"` } if json.Unmarshal([]byte(data), &event) == nil { stopReason = event.Delta.StopReason outputTokens = event.Usage.OutputTokens } case "message_stop": // End of stream — break out goto done case "error": // {"type":"error","error":{"type":"...","message":"..."}} var event struct { Error struct { Type string `json:"type"` Message string `json:"message"` } `json:"error"` } if json.Unmarshal([]byte(data), &event) == nil { return nil, &LLMError{ Type: event.Error.Type, Message: fmt.Sprintf("AnthropicAdapter: stream error: %s", event.Error.Message), Model: model, } } return nil, fmt.Errorf("AnthropicAdapter: stream error (unparseable): %s", data) } eventType = "" // Reset for next event } done: if err := scanner.Err(); err != nil { return nil, fmt.Errorf("AnthropicAdapter: error reading stream: %w", err) } return map[string]interface{}{ "content": fullContent.String(), "model": model, "finish_reason": stopReason, "response_id": responseID, "usage": map[string]interface{}{ "prompt_tokens": inputTokens, "completion_tokens": outputTokens, "total_tokens": inputTokens + outputTokens, }, }, nil } // parseStructuredJSON attempts to parse the "content" field of the result as JSON. // Called when output_config requested json_schema format. func (a *AnthropicAdapter) parseStructuredJSON(result map[string]interface{}) error { text, ok := result["content"].(string) if !ok || text == "" { return nil } // Strip markdown code fences if present (e.g. ```json ... ``` or ``` ... ```) raw := strings.TrimSpace(text) if strings.HasPrefix(raw, "```") { if idx := strings.Index(raw, "\n"); idx != -1 { raw = raw[idx+1:] } if idx := strings.LastIndex(raw, "```"); idx != -1 { raw = strings.TrimSpace(raw[:idx]) } } var parsed interface{} if err := json.Unmarshal([]byte(raw), &parsed); err != nil { model, _ := result["model"].(string) return &LLMError{ Type: "json_parse_error", Code: "JSON_PARSE_ERROR", Message: fmt.Sprintf("AnthropicAdapter: failed to parse structured output as JSON: %v", err), Retryable: false, Model: model, } } result["content"] = parsed return nil } // parseAnthropicError constructs a structured LLMError from an HTTP error response. func parseAnthropicError(statusCode int, body []byte, model string) *LLMError { llmErr := &LLMError{ StatusCode: statusCode, Model: model, Message: string(body), } // Try to parse body as JSON for richer error info var errResp struct { Error struct { Type string `json:"type"` Message string `json:"message"` } `json:"error"` } if json.Unmarshal(body, &errResp) == nil && errResp.Error.Type != "" { llmErr.Type = errResp.Error.Type llmErr.Message = fmt.Sprintf("AnthropicAdapter: API error %s: %s", errResp.Error.Type, errResp.Error.Message) } // Classify retryability and error type based on status code switch { case statusCode == 429: llmErr.Type = "rate_limit_error" llmErr.Code = "RATE_LIMITED" llmErr.Retryable = true case statusCode == 529, statusCode == 503: llmErr.Type = "overloaded_error" llmErr.Code = "OVERLOADED" llmErr.Retryable = true case statusCode >= 500: llmErr.Type = "api_error" llmErr.Code = "SERVER_ERROR" llmErr.Retryable = true case statusCode == 401: llmErr.Type = "authentication_error" llmErr.Code = "AUTH_ERROR" llmErr.Retryable = false default: llmErr.Type = "invalid_request_error" llmErr.Code = "INVALID_REQUEST" llmErr.Retryable = false } return llmErr }