| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538 |
- package workflow
- import (
- "bufio"
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "strings"
- "time"
- )
- // schemaToInstruction converts a JSON Schema object to a strict system-level instruction.
- // The strong imperative framing ("enforced by runtime") reduces the model's tendency to
- // wrap output in markdown fences or add explanatory text.
- func schemaToInstruction(schema map[string]interface{}) string {
- const header = "OUTPUT FORMAT REQUIREMENT (enforced by runtime, do not deviate):\n" +
- "You must output ONLY a raw JSON object or array. No other text is allowed.\n" +
- "- Start your response with { or [, end with } or ]\n" +
- "- Do NOT wrap in markdown code fences (no ```json, no ```)\n" +
- "- Do NOT include any preamble, explanation, or commentary before or after the JSON\n" +
- "- Use double quotes for all keys and string values\n" +
- "- Do not include trailing commas\n" +
- "- The output must be directly parseable by JSON.parse() with zero preprocessing"
- if schema == nil {
- return header
- }
- schemaJSON, err := json.MarshalIndent(schema, "", " ")
- if err != nil {
- return header
- }
- return header + "\n\nJSON Schema to conform to:\n" + string(schemaJSON)
- }
- // AnthropicAdapter calls the Anthropic Messages API directly (no OpenAI proxy needed).
- //
- // Activate automatically in cmd/ when LLM_URL contains "anthropic.com".
- //
- // Config (.env):
- //
- // LLM_URL=https://api.anthropic.com
- // LLM_KEY=sk-ant-api03-...
- // LLM_MODEL=claude-3-5-sonnet-20241022
- type AnthropicAdapter struct {
- baseURL string
- apiKey string
- model string
- client *http.Client
- }
- // AnthropicConfig holds configuration for AnthropicAdapter.
- type AnthropicConfig struct {
- APIKey string // Anthropic API key (sk-ant-...)
- Model string // e.g. "claude-3-5-sonnet-20241022"
- BaseURL string // default: "https://api.anthropic.com"
- Timeout time.Duration // default: 5 minutes
- }
- // NewAnthropicAdapter creates a new AnthropicAdapter.
- func NewAnthropicAdapter(cfg AnthropicConfig) *AnthropicAdapter {
- baseURL := strings.TrimSuffix(cfg.BaseURL, "/")
- if baseURL == "" {
- baseURL = "https://api.anthropic.com"
- }
- model := cfg.Model
- if model == "" {
- model = "claude-3-5-sonnet-20241022"
- }
- timeout := cfg.Timeout
- if timeout == 0 {
- timeout = 5 * time.Minute
- }
- return &AnthropicAdapter{
- baseURL: baseURL,
- apiKey: cfg.APIKey,
- model: model,
- client: &http.Client{Timeout: timeout},
- }
- }
- // anthropicMsg is a message in the Anthropic API format.
- type anthropicMsg struct {
- Role string `json:"role"`
- Content string `json:"content"`
- }
- // anthropicReq is the POST body for /v1/messages.
- type anthropicReq struct {
- Model string `json:"model"`
- MaxTokens int `json:"max_tokens"`
- Messages []anthropicMsg `json:"messages"`
- System string `json:"system,omitempty"`
- Stream bool `json:"stream,omitempty"`
- }
- // anthropicResp is the response from /v1/messages (non-streaming).
- type anthropicResp struct {
- ID string `json:"id"`
- Model string `json:"model"`
- Content []struct {
- Type string `json:"type"`
- Text string `json:"text"`
- } `json:"content"`
- StopReason string `json:"stop_reason"`
- Usage struct {
- InputTokens int `json:"input_tokens"`
- OutputTokens int `json:"output_tokens"`
- } `json:"usage"`
- Error *struct {
- Type string `json:"type"`
- Message string `json:"message"`
- } `json:"error,omitempty"`
- }
- // anthropicCallParams holds parsed parameters for a single Call invocation.
- type anthropicCallParams struct {
- model string
- maxTokens int
- msgs []anthropicMsg
- system string
- shouldParseJSON bool
- jsonSchema map[string]interface{}
- }
- // Call implements LLMAdapter. Supports both streaming (SSE) and non-streaming modes.
- func (a *AnthropicAdapter) Call(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
- p, err := a.parseCallParams(params)
- if err != nil {
- return nil, err
- }
- // Check if streaming is requested
- isStreaming := false
- if streamVal, ok := params["stream"].(bool); ok {
- isStreaming = streamVal
- }
- var result map[string]interface{}
- if isStreaming && stream != nil {
- result, err = a.callStreaming(ctx, p, stream)
- } else {
- result, err = a.callNonStreaming(ctx, p)
- }
- if err != nil {
- return nil, err
- }
- // Parse structured JSON output if requested
- if p.shouldParseJSON {
- if err := a.parseStructuredJSON(result); err != nil {
- return nil, err
- }
- }
- return result, nil
- }
- // parseCallParams extracts and validates parameters from the generic params map.
- func (a *AnthropicAdapter) parseCallParams(params map[string]interface{}) (*anthropicCallParams, error) {
- p := &anthropicCallParams{
- model: a.model,
- maxTokens: 4096,
- }
- if m, ok := params["model"].(string); ok && m != "" {
- p.model = m
- }
- switch v := params["max_tokens"].(type) {
- case int:
- p.maxTokens = v
- case float64:
- p.maxTokens = int(v)
- }
- // Detect structured JSON request from output_config
- if outputConfig, ok := params["output_config"].(map[string]interface{}); ok {
- if format, ok := outputConfig["format"].(map[string]interface{}); ok {
- if ftype, ok := format["type"].(string); ok && ftype == "json_schema" {
- p.shouldParseJSON = true
- if s, ok := format["schema"].(map[string]interface{}); ok {
- p.jsonSchema = s
- }
- }
- }
- }
- // Parse messages + system
- if raw, ok := params["messages"].([]interface{}); ok {
- for _, item := range raw {
- m, ok := item.(map[string]interface{})
- if !ok {
- continue
- }
- role, _ := m["role"].(string)
- content, _ := m["content"].(string)
- if role == "system" {
- p.system = content
- } else {
- p.msgs = append(p.msgs, anthropicMsg{Role: role, Content: content})
- }
- }
- } else if prompt, ok := params["prompt"].(string); ok {
- p.msgs = []anthropicMsg{{Role: "user", Content: prompt}}
- } else {
- return nil, fmt.Errorf("AnthropicAdapter: params must include 'messages' or 'prompt'")
- }
- if len(p.msgs) == 0 {
- return nil, fmt.Errorf("AnthropicAdapter: no user/assistant messages")
- }
- // Inject schema instruction into system prompt
- if p.shouldParseJSON {
- instruction := schemaToInstruction(p.jsonSchema)
- if p.system == "" {
- p.system = instruction
- } else {
- p.system = p.system + "\n\n" + instruction
- }
- }
- return p, nil
- }
- // buildHTTPRequest creates the HTTP request for the Anthropic API.
- func (a *AnthropicAdapter) buildHTTPRequest(ctx context.Context, p *anthropicCallParams, streaming bool) (*http.Request, error) {
- reqBody := anthropicReq{
- Model: p.model,
- MaxTokens: p.maxTokens,
- Messages: p.msgs,
- System: p.system,
- Stream: streaming,
- }
- bodyBytes, err := json.Marshal(reqBody)
- if err != nil {
- return nil, fmt.Errorf("AnthropicAdapter: marshal: %w", err)
- }
- httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
- a.baseURL+"/v1/messages", bytes.NewReader(bodyBytes))
- if err != nil {
- return nil, fmt.Errorf("AnthropicAdapter: new request: %w", err)
- }
- httpReq.Header.Set("Content-Type", "application/json")
- httpReq.Header.Set("x-api-key", a.apiKey)
- httpReq.Header.Set("anthropic-version", "2023-06-01")
- if streaming {
- httpReq.Header.Set("Accept", "text/event-stream")
- }
- return httpReq, nil
- }
- // callNonStreaming performs a standard (non-streaming) API call.
- func (a *AnthropicAdapter) callNonStreaming(ctx context.Context, p *anthropicCallParams) (map[string]interface{}, error) {
- httpReq, err := a.buildHTTPRequest(ctx, p, false)
- if err != nil {
- return nil, err
- }
- resp, err := a.client.Do(httpReq)
- if err != nil {
- return nil, fmt.Errorf("AnthropicAdapter: HTTP: %w", err)
- }
- defer resp.Body.Close()
- respBytes, err := io.ReadAll(resp.Body)
- if err != nil {
- return nil, fmt.Errorf("AnthropicAdapter: read body: %w", err)
- }
- if resp.StatusCode != http.StatusOK {
- return nil, parseAnthropicError(resp.StatusCode, respBytes, p.model)
- }
- var apiResp anthropicResp
- if err := json.Unmarshal(respBytes, &apiResp); err != nil {
- return nil, fmt.Errorf("AnthropicAdapter: parse response: %w", err)
- }
- if apiResp.Error != nil {
- return nil, &LLMError{
- Type: apiResp.Error.Type,
- Message: fmt.Sprintf("AnthropicAdapter: API error: %s", apiResp.Error.Message),
- Model: apiResp.Model,
- }
- }
- // Extract content text
- var text strings.Builder
- for _, block := range apiResp.Content {
- if block.Type == "text" {
- text.WriteString(block.Text)
- }
- }
- return map[string]interface{}{
- "content": text.String(),
- "model": apiResp.Model,
- "finish_reason": apiResp.StopReason,
- "response_id": apiResp.ID,
- "usage": map[string]interface{}{
- "prompt_tokens": apiResp.Usage.InputTokens,
- "completion_tokens": apiResp.Usage.OutputTokens,
- "total_tokens": apiResp.Usage.InputTokens + apiResp.Usage.OutputTokens,
- },
- }, nil
- }
- // callStreaming performs a streaming API call using Anthropic's SSE protocol.
- //
- // Anthropic SSE event types:
- // - message_start: contains message metadata (id, model, usage.input_tokens)
- // - content_block_delta: contains incremental text (delta.text)
- // - message_delta: contains stop_reason and usage.output_tokens
- // - message_stop: signals end of stream
- // - error: API error during streaming
- //
- // Channel contract (same as OpenAI adapter):
- // - Does not close the channel (engine is responsible)
- // - Uses select for non-blocking send
- // - All sends complete before Call returns (no goroutines)
- func (a *AnthropicAdapter) callStreaming(ctx context.Context, p *anthropicCallParams, stream chan<- string) (map[string]interface{}, error) {
- httpReq, err := a.buildHTTPRequest(ctx, p, true)
- if err != nil {
- return nil, err
- }
- resp, err := a.client.Do(httpReq)
- if err != nil {
- return nil, fmt.Errorf("AnthropicAdapter: HTTP: %w", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- bodyBytes, _ := io.ReadAll(resp.Body)
- return nil, parseAnthropicError(resp.StatusCode, bodyBytes, p.model)
- }
- // Process SSE stream
- var fullContent strings.Builder
- var model, responseID, stopReason string
- var inputTokens, outputTokens int
- scanner := bufio.NewScanner(resp.Body)
- scanner.Buffer(make([]byte, 64*1024), 1024*1024)
- var eventType string
- for scanner.Scan() {
- line := scanner.Text()
- // Track event type from "event:" lines
- if strings.HasPrefix(line, "event: ") {
- eventType = strings.TrimPrefix(line, "event: ")
- continue
- }
- // Skip empty lines, comments, non-data lines
- if line == "" || strings.HasPrefix(line, ":") || !strings.HasPrefix(line, "data: ") {
- continue
- }
- data := strings.TrimPrefix(line, "data: ")
- switch eventType {
- case "message_start":
- // {"type":"message_start","message":{"id":"...","model":"...","usage":{"input_tokens":N}}}
- var event struct {
- Message struct {
- ID string `json:"id"`
- Model string `json:"model"`
- Usage struct {
- InputTokens int `json:"input_tokens"`
- } `json:"usage"`
- } `json:"message"`
- }
- if json.Unmarshal([]byte(data), &event) == nil {
- responseID = event.Message.ID
- model = event.Message.Model
- inputTokens = event.Message.Usage.InputTokens
- }
- case "content_block_delta":
- // {"type":"content_block_delta","delta":{"type":"text_delta","text":"..."}}
- var event struct {
- Delta struct {
- Text string `json:"text"`
- } `json:"delta"`
- }
- if json.Unmarshal([]byte(data), &event) == nil && event.Delta.Text != "" {
- fullContent.WriteString(event.Delta.Text)
- select {
- case stream <- event.Delta.Text:
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- }
- case "message_delta":
- // {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":N}}
- var event struct {
- Delta struct {
- StopReason string `json:"stop_reason"`
- } `json:"delta"`
- Usage struct {
- OutputTokens int `json:"output_tokens"`
- } `json:"usage"`
- }
- if json.Unmarshal([]byte(data), &event) == nil {
- stopReason = event.Delta.StopReason
- outputTokens = event.Usage.OutputTokens
- }
- case "message_stop":
- // End of stream — break out
- goto done
- case "error":
- // {"type":"error","error":{"type":"...","message":"..."}}
- var event struct {
- Error struct {
- Type string `json:"type"`
- Message string `json:"message"`
- } `json:"error"`
- }
- if json.Unmarshal([]byte(data), &event) == nil {
- return nil, &LLMError{
- Type: event.Error.Type,
- Message: fmt.Sprintf("AnthropicAdapter: stream error: %s", event.Error.Message),
- Model: model,
- }
- }
- return nil, fmt.Errorf("AnthropicAdapter: stream error (unparseable): %s", data)
- }
- eventType = "" // Reset for next event
- }
- done:
- if err := scanner.Err(); err != nil {
- return nil, fmt.Errorf("AnthropicAdapter: error reading stream: %w", err)
- }
- return map[string]interface{}{
- "content": fullContent.String(),
- "model": model,
- "finish_reason": stopReason,
- "response_id": responseID,
- "usage": map[string]interface{}{
- "prompt_tokens": inputTokens,
- "completion_tokens": outputTokens,
- "total_tokens": inputTokens + outputTokens,
- },
- }, nil
- }
- // parseStructuredJSON attempts to parse the "content" field of the result as JSON.
- // Called when output_config requested json_schema format.
- func (a *AnthropicAdapter) parseStructuredJSON(result map[string]interface{}) error {
- text, ok := result["content"].(string)
- if !ok || text == "" {
- return nil
- }
- // Strip markdown code fences if present (e.g. ```json ... ``` or ``` ... ```)
- raw := strings.TrimSpace(text)
- if strings.HasPrefix(raw, "```") {
- if idx := strings.Index(raw, "\n"); idx != -1 {
- raw = raw[idx+1:]
- }
- if idx := strings.LastIndex(raw, "```"); idx != -1 {
- raw = strings.TrimSpace(raw[:idx])
- }
- }
- var parsed interface{}
- if err := json.Unmarshal([]byte(raw), &parsed); err != nil {
- model, _ := result["model"].(string)
- return &LLMError{
- Type: "json_parse_error",
- Code: "JSON_PARSE_ERROR",
- Message: fmt.Sprintf("AnthropicAdapter: failed to parse structured output as JSON: %v", err),
- Retryable: false,
- Model: model,
- }
- }
- result["content"] = parsed
- return nil
- }
- // parseAnthropicError constructs a structured LLMError from an HTTP error response.
- func parseAnthropicError(statusCode int, body []byte, model string) *LLMError {
- llmErr := &LLMError{
- StatusCode: statusCode,
- Model: model,
- Message: string(body),
- }
- // Try to parse body as JSON for richer error info
- var errResp struct {
- Error struct {
- Type string `json:"type"`
- Message string `json:"message"`
- } `json:"error"`
- }
- if json.Unmarshal(body, &errResp) == nil && errResp.Error.Type != "" {
- llmErr.Type = errResp.Error.Type
- llmErr.Message = fmt.Sprintf("AnthropicAdapter: API error %s: %s", errResp.Error.Type, errResp.Error.Message)
- }
- // Classify retryability and error type based on status code
- switch {
- case statusCode == 429:
- llmErr.Type = "rate_limit_error"
- llmErr.Code = "RATE_LIMITED"
- llmErr.Retryable = true
- case statusCode == 529, statusCode == 503:
- llmErr.Type = "overloaded_error"
- llmErr.Code = "OVERLOADED"
- llmErr.Retryable = true
- case statusCode >= 500:
- llmErr.Type = "api_error"
- llmErr.Code = "SERVER_ERROR"
- llmErr.Retryable = true
- case statusCode == 401:
- llmErr.Type = "authentication_error"
- llmErr.Code = "AUTH_ERROR"
- llmErr.Retryable = false
- default:
- llmErr.Type = "invalid_request_error"
- llmErr.Code = "INVALID_REQUEST"
- llmErr.Retryable = false
- }
- return llmErr
- }
|