package workflow import ( "bufio" "bytes" "context" "encoding/json" "fmt" "io" "net/http" "strings" "time" ) // OpenAIAdapter implements LLMAdapter for OpenAI-compatible server type OpenAIAdapter struct { baseURL string apiKey string model string cacheControl *bool requestAPIKey string httpClient *http.Client } // OpenAIConfig holds configuration for OpenAI-compatible adapter type OpenAIConfig struct { BaseURL string // OpenAI-compatible server URL (e.g., "http://localhost:4000") APIKey string // Optional API key for Authorization header Model string // Optional model override (overwrites model in LLM requests) CacheControl *bool // Optional cache_control override (overwrites cache_control in LLM requests) RequestAPIKey string // Optional API key sent in request body as "api_key" (for BYOK passthrough) Timeout time.Duration // HTTP timeout (default: 5 minutes) } // NewOpenAIAdapter creates a new OpenAI-compatible adapter func NewOpenAIAdapter(config OpenAIConfig) *OpenAIAdapter { timeout := config.Timeout if timeout == 0 { timeout = 5 * time.Minute } baseURL := strings.TrimSuffix(config.BaseURL, "/") if baseURL == "" { baseURL = "http://localhost:4000" } return &OpenAIAdapter{ baseURL: baseURL, apiKey: config.APIKey, model: config.Model, cacheControl: config.CacheControl, requestAPIKey: config.RequestAPIKey, httpClient: &http.Client{ Timeout: timeout, }, } } // ResponseFormat represents the response format configuration (OpenAI style) type ResponseFormat struct { Type string `json:"type"` // "json_schema" or "text" JSONSchema *JSONSchema `json:"json_schema,omitempty"` // Only when type is "json_schema" } // JSONSchema represents the JSON Schema configuration type JSONSchema struct { Name string `json:"name"` // Schema name identifier Description string `json:"description,omitempty"` // Optional description Schema map[string]interface{} `json:"schema"` // JSON Schema object Strict bool `json:"strict,omitempty"` // Optional strict mode } // OutputConfig represents the output configuration (Anthropic style) type OutputConfig struct { Format *OutputFormat `json:"format,omitempty"` } // OutputFormat represents the format configuration for Anthropic type OutputFormat struct { Type string `json:"type"` // "json_schema" or "text" Schema map[string]interface{} `json:"schema,omitempty"` // JSON Schema object (when type is "json_schema") } // ChatCompletionRequest represents the OpenAI-compatible chat request type ChatCompletionRequest struct { Model string `json:"model"` Messages []ChatMessage `json:"messages"` Stream bool `json:"stream,omitempty"` Temperature *float64 `json:"temperature,omitempty"` MaxTokens *int `json:"max_tokens,omitempty"` TopP *float64 `json:"top_p,omitempty"` Stop []string `json:"stop,omitempty"` Tools []map[string]interface{} `json:"tools,omitempty"` APIKey string `json:"api_key,omitempty"` ResponseFormat *ResponseFormat `json:"response_format,omitempty"` // OpenAI style OutputConfig *OutputConfig `json:"output_config,omitempty"` // Anthropic style } // CacheControl represents Anthropic's cache control directive type CacheControl struct { Type string `json:"type"` } // ChatMessage represents a message in the chat type ChatMessage struct { Role string `json:"role"` Content string `json:"content"` CacheControl *CacheControl `json:"cache_control,omitempty"` } // ChatCompletionResponse represents the OpenAI-compatible chat response type ChatCompletionResponse struct { ID string `json:"id"` Object string `json:"object"` Created int64 `json:"created"` Model string `json:"model"` Choices []struct { Index int `json:"index"` Message struct { Role string `json:"role"` Content string `json:"content"` } `json:"message"` FinishReason string `json:"finish_reason"` } `json:"choices"` Usage struct { PromptTokens int `json:"prompt_tokens"` CompletionTokens int `json:"completion_tokens"` TotalTokens int `json:"total_tokens"` } `json:"usage"` } // ChatCompletionStreamResponse represents a streaming response chunk type ChatCompletionStreamResponse struct { ID string `json:"id"` Object string `json:"object"` Created int64 `json:"created"` Model string `json:"model"` Choices []struct { Index int `json:"index"` Delta struct { Role string `json:"role,omitempty"` Content string `json:"content,omitempty"` } `json:"delta"` FinishReason *string `json:"finish_reason"` } `json:"choices"` } // Call implements LLMAdapter interface func (a *OpenAIAdapter) Call(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) { // Build request from params req, err := a.buildRequest(params) if err != nil { return nil, fmt.Errorf("failed to build request: %w", err) } // Check if streaming is requested isStreaming := false if streamVal, ok := params["stream"].(bool); ok { isStreaming = streamVal } req.Stream = isStreaming // Make the request var result map[string]interface{} if isStreaming && stream != nil { result, err = a.callStreaming(ctx, req, stream) } else { result, err = a.callNonStreaming(ctx, req) } if err != nil { return nil, err } // Check if structured output (json_schema) was requested // Check both response_format (OpenAI) and output_config (Anthropic) styles shouldParseJSON := false // Check response_format (OpenAI style) if responseFormat, ok := params["response_format"].(map[string]interface{}); ok { if formatType, ok := responseFormat["type"].(string); ok && formatType == "json_schema" { shouldParseJSON = true } } // Check output_config (Anthropic style) if outputConfig, ok := params["output_config"].(map[string]interface{}); ok { if format, ok := outputConfig["format"].(map[string]interface{}); ok { if formatType, ok := format["type"].(string); ok && formatType == "json_schema" { shouldParseJSON = true } } } // Parse content as JSON if structured output was requested if shouldParseJSON { if content, ok := result["content"].(string); ok && content != "" { var parsed interface{} if err := json.Unmarshal([]byte(content), &parsed); err != nil { model, _ := result["model"].(string) return nil, &LLMError{ Type: "json_parse_error", Code: "JSON_PARSE_ERROR", Message: fmt.Sprintf("failed to parse structured output as JSON: %v", err), Retryable: false, Model: model, } } // Replace the content string with the parsed JSON object result["content"] = parsed } } return result, nil } // buildRequest constructs a ChatCompletionRequest from params func (a *OpenAIAdapter) buildRequest(params map[string]interface{}) (*ChatCompletionRequest, error) { req := &ChatCompletionRequest{} // API key in request body for BYOK passthrough if a.requestAPIKey != "" { req.APIKey = a.requestAPIKey } // Model: adapter-level override takes priority, then params, then default // Note: In v3.9+, model should not be specified in workflow JSON - it should be // injected at runtime via adapter configuration (a.model). The params["model"] // fallback is kept for backward compatibility with v3.6/v3.7/v3.8 workflows. if a.model != "" { req.Model = a.model } else if model, ok := params["model"].(string); ok { req.Model = model } else { req.Model = "gpt-4" // default model } // Cache control: adapter-level override takes priority, then params cacheControl := false if a.cacheControl != nil { cacheControl = *a.cacheControl } else if cc, ok := params["cache_control"].(bool); ok { cacheControl = cc } // Messages (required) if messages, ok := params["messages"].([]interface{}); ok { for _, m := range messages { msg, ok := m.(map[string]interface{}) if !ok { continue } chatMsg := ChatMessage{} if role, ok := msg["role"].(string); ok { chatMsg.Role = role } if content, ok := msg["content"].(string); ok { chatMsg.Content = content } // Apply cache_control to system messages for Anthropic models if cacheControl && chatMsg.Role == "system" && isAnthropicModel(req.Model) { chatMsg.CacheControl = &CacheControl{Type: "ephemeral"} } req.Messages = append(req.Messages, chatMsg) } } // Optional parameters if temp, ok := params["temperature"].(float64); ok { req.Temperature = &temp } if maxTokens, ok := params["max_tokens"].(int); ok { req.MaxTokens = &maxTokens } else if maxTokens, ok := params["max_tokens"].(float64); ok { mt := int(maxTokens) req.MaxTokens = &mt } if topP, ok := params["top_p"].(float64); ok { req.TopP = &topP } if stop, ok := params["stop"].([]interface{}); ok { for _, s := range stop { if str, ok := s.(string); ok { req.Stop = append(req.Stop, str) } } } if tools, ok := params["tools"].([]interface{}); ok { for _, t := range tools { if tool, ok := t.(map[string]interface{}); ok { req.Tools = append(req.Tools, tool) } } } // Handle response_format (vendor-agnostic to vendor-specific mapping) if responseFormat, ok := params["response_format"].(map[string]interface{}); ok { if err := a.applyResponseFormat(req, responseFormat); err != nil { return nil, fmt.Errorf("failed to apply response_format: %w", err) } } // Handle output_config (Anthropic style - direct passthrough) // This takes precedence over response_format mapping if both are present if outputConfig, ok := params["output_config"].(map[string]interface{}); ok { if err := a.applyOutputConfig(req, outputConfig); err != nil { return nil, fmt.Errorf("failed to apply output_config: %w", err) } } return req, nil } // applyResponseFormat maps vendor-agnostic response_format to vendor-specific format func (a *OpenAIAdapter) applyResponseFormat(req *ChatCompletionRequest, responseFormat map[string]interface{}) error { // Extract type field formatType, ok := responseFormat["type"].(string) if !ok { return fmt.Errorf("response_format.type must be a string") } // If type is "text", no need to set anything (default behavior) if formatType == "text" { return nil } // Handle "json_schema" type if formatType == "json_schema" { // Extract json_schema object jsonSchemaRaw, ok := responseFormat["json_schema"].(map[string]interface{}) if !ok { return fmt.Errorf("response_format.json_schema must be an object when type is json_schema") } // Build JSONSchema struct jsonSchema := &JSONSchema{} if name, ok := jsonSchemaRaw["name"].(string); ok { jsonSchema.Name = name } else { return fmt.Errorf("response_format.json_schema.name is required") } if desc, ok := jsonSchemaRaw["description"].(string); ok { jsonSchema.Description = desc } if schema, ok := jsonSchemaRaw["schema"].(map[string]interface{}); ok { jsonSchema.Schema = schema } else { return fmt.Errorf("response_format.json_schema.schema is required") } if strict, ok := jsonSchemaRaw["strict"].(bool); ok { jsonSchema.Strict = strict } // Determine vendor and apply appropriate format if isAnthropicModel(req.Model) { // Anthropic uses output_config with schema directly in format req.OutputConfig = &OutputConfig{ Format: &OutputFormat{ Type: "json_schema", Schema: jsonSchema.Schema, }, } } else { // OpenAI and others use response_format with json_schema wrapper req.ResponseFormat = &ResponseFormat{ Type: "json_schema", JSONSchema: jsonSchema, } } } return nil } // applyOutputConfig applies output_config directly from params (Anthropic style) // This is a direct passthrough - the output_config structure in the workflow JSON // must match the target LLM vendor's API format exactly func (a *OpenAIAdapter) applyOutputConfig(req *ChatCompletionRequest, outputConfig map[string]interface{}) error { // Extract format field format, ok := outputConfig["format"].(map[string]interface{}) if !ok { return fmt.Errorf("output_config.format must be an object") } // Extract type field formatType, ok := format["type"].(string) if !ok { return fmt.Errorf("output_config.format.type must be a string") } // If type is "text", no need to set anything (default behavior) if formatType == "text" { return nil } // Handle "json_schema" type if formatType == "json_schema" { // Extract schema object schemaRaw, ok := format["schema"].(map[string]interface{}) if !ok { // Provide more detailed error information schemaVal, exists := format["schema"] if !exists { return fmt.Errorf("output_config.format.schema is required when type is json_schema (field is missing)") } return fmt.Errorf("output_config.format.schema must be an object when type is json_schema (got %T: %v)", schemaVal, schemaVal) } // For Anthropic models, use output_config structure with schema directly req.OutputConfig = &OutputConfig{ Format: &OutputFormat{ Type: "json_schema", Schema: schemaRaw, }, } } return nil } // callNonStreaming makes a non-streaming request func (a *OpenAIAdapter) callNonStreaming(ctx context.Context, req *ChatCompletionRequest) (map[string]interface{}, error) { req.Stream = false body, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("failed to marshal request: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, "POST", a.baseURL+"/chat/completions", bytes.NewReader(body)) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") if a.apiKey != "" { httpReq.Header.Set("Authorization", "Bearer "+a.apiKey) } resp, err := a.httpClient.Do(httpReq) if err != nil { return nil, fmt.Errorf("failed to send request: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { bodyBytes, _ := io.ReadAll(resp.Body) return nil, parseOpenAIError(resp.StatusCode, bodyBytes, req.Model) } var chatResp ChatCompletionResponse if err := json.NewDecoder(resp.Body).Decode(&chatResp); err != nil { return nil, fmt.Errorf("failed to decode response: %w", err) } // Extract content from response content := "" if len(chatResp.Choices) > 0 { content = chatResp.Choices[0].Message.Content } return map[string]interface{}{ "content": content, "model": chatResp.Model, "finish_reason": getFinishReason(chatResp.Choices), "response_id": chatResp.ID, "usage": map[string]interface{}{ "prompt_tokens": chatResp.Usage.PromptTokens, "completion_tokens": chatResp.Usage.CompletionTokens, "total_tokens": chatResp.Usage.TotalTokens, }, }, nil } // callStreaming makes a streaming request func (a *OpenAIAdapter) callStreaming(ctx context.Context, req *ChatCompletionRequest, stream chan<- string) (map[string]interface{}, error) { req.Stream = true body, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("failed to marshal request: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, "POST", a.baseURL+"/chat/completions", bytes.NewReader(body)) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Accept", "text/event-stream") if a.apiKey != "" { httpReq.Header.Set("Authorization", "Bearer "+a.apiKey) } resp, err := a.httpClient.Do(httpReq) if err != nil { return nil, fmt.Errorf("failed to send request: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { bodyBytes, _ := io.ReadAll(resp.Body) return nil, parseOpenAIError(resp.StatusCode, bodyBytes, req.Model) } // Process SSE stream var fullContent strings.Builder var finishReason string var model string var responseID string scanner := bufio.NewScanner(resp.Body) // Increase buffer size for large SSE chunks (1MB max) scanner.Buffer(make([]byte, 64*1024), 1024*1024) for scanner.Scan() { line := scanner.Text() // Skip empty lines and comments if line == "" || strings.HasPrefix(line, ":") { continue } // Parse SSE data if !strings.HasPrefix(line, "data: ") { continue } data := strings.TrimPrefix(line, "data: ") // Check for end of stream if data == "[DONE]" { break } var chunk ChatCompletionStreamResponse if err := json.Unmarshal([]byte(data), &chunk); err != nil { continue // Skip malformed chunks } if model == "" && chunk.Model != "" { model = chunk.Model } if responseID == "" && chunk.ID != "" { responseID = chunk.ID } // Extract content from chunk if len(chunk.Choices) > 0 { delta := chunk.Choices[0].Delta if delta.Content != "" { fullContent.WriteString(delta.Content) // Send chunk to stream channel select { case stream <- delta.Content: case <-ctx.Done(): return nil, ctx.Err() } } if chunk.Choices[0].FinishReason != nil { finishReason = *chunk.Choices[0].FinishReason } } } if err := scanner.Err(); err != nil { return nil, fmt.Errorf("error reading stream: %w", err) } return map[string]interface{}{ "content": fullContent.String(), "model": model, "finish_reason": finishReason, "response_id": responseID, }, nil } // isAnthropicModel checks if the model string refers to an Anthropic model func isAnthropicModel(model string) bool { m := strings.ToLower(model) return strings.HasPrefix(m, "claude") || strings.HasPrefix(m, "anthropic/") || strings.Contains(m, "anthropic") } // parseOpenAIError constructs a structured LLMError from an HTTP error response func parseOpenAIError(statusCode int, body []byte, model string) *LLMError { llmErr := &LLMError{ StatusCode: statusCode, Model: model, Message: string(body), } // Try to parse body as JSON for richer error info var errResp map[string]interface{} if json.Unmarshal(body, &errResp) == nil { llmErr.Raw = errResp if msg, ok := errResp["message"].(string); ok { llmErr.Message = msg } if errObj, ok := errResp["error"].(map[string]interface{}); ok { if msg, ok := errObj["message"].(string); ok { llmErr.Message = msg } if code, ok := errObj["code"].(string); ok { llmErr.Code = code } if errType, ok := errObj["type"].(string); ok { llmErr.ProviderError = errType } } } // Classify error type based on status code switch { case statusCode == 401 || statusCode == 403: llmErr.Type = "auth_error" llmErr.Code = "AUTH_ERROR" llmErr.Retryable = false case statusCode == 400: llmErr.Type = "bad_request" llmErr.Code = "BAD_REQUEST" llmErr.Retryable = false case statusCode == 429: llmErr.Type = "rate_limit" llmErr.Code = "RATE_LIMIT" llmErr.Retryable = true case statusCode == 408: llmErr.Type = "timeout" llmErr.Code = "TIMEOUT" llmErr.Retryable = true case statusCode >= 500: llmErr.Type = "service_unavailable" llmErr.Code = "SERVICE_UNAVAILABLE" llmErr.Retryable = true default: llmErr.Type = "unknown_error" llmErr.Code = "UNKNOWN_ERROR" llmErr.Retryable = false } // Refine classification based on error message content msgLower := strings.ToLower(llmErr.Message) if strings.Contains(msgLower, "context length") || strings.Contains(msgLower, "token limit") { llmErr.Type = "context_length_exceeded" llmErr.Code = "CONTEXT_LENGTH_EXCEEDED" llmErr.Retryable = false } if strings.Contains(msgLower, "content policy") || strings.Contains(msgLower, "content_filter") { llmErr.Type = "content_policy_violation" llmErr.Code = "CONTENT_POLICY_VIOLATION" llmErr.Retryable = false } // Infer provider from model name llmErr.Provider = inferProvider(model) return llmErr } func getFinishReason(choices []struct { Index int `json:"index"` Message struct { Role string `json:"role"` Content string `json:"content"` } `json:"message"` FinishReason string `json:"finish_reason"` }) string { if len(choices) > 0 { return choices[0].FinishReason } return "" }