| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675 |
- package workflow
- import (
- "bufio"
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "strings"
- "time"
- )
- // OpenAIAdapter implements LLMAdapter for OpenAI-compatible server
- type OpenAIAdapter struct {
- baseURL string
- apiKey string
- model string
- cacheControl *bool
- requestAPIKey string
- httpClient *http.Client
- }
- // OpenAIConfig holds configuration for OpenAI-compatible adapter
- type OpenAIConfig struct {
- BaseURL string // OpenAI-compatible server URL (e.g., "http://localhost:4000")
- APIKey string // Optional API key for Authorization header
- Model string // Optional model override (overwrites model in LLM requests)
- CacheControl *bool // Optional cache_control override (overwrites cache_control in LLM requests)
- RequestAPIKey string // Optional API key sent in request body as "api_key" (for BYOK passthrough)
- Timeout time.Duration // HTTP timeout (default: 5 minutes)
- }
- // NewOpenAIAdapter creates a new OpenAI-compatible adapter
- func NewOpenAIAdapter(config OpenAIConfig) *OpenAIAdapter {
- timeout := config.Timeout
- if timeout == 0 {
- timeout = 5 * time.Minute
- }
- baseURL := strings.TrimSuffix(config.BaseURL, "/")
- if baseURL == "" {
- baseURL = "http://localhost:4000"
- }
- return &OpenAIAdapter{
- baseURL: baseURL,
- apiKey: config.APIKey,
- model: config.Model,
- cacheControl: config.CacheControl,
- requestAPIKey: config.RequestAPIKey,
- httpClient: &http.Client{
- Timeout: timeout,
- },
- }
- }
- // ResponseFormat represents the response format configuration (OpenAI style)
- type ResponseFormat struct {
- Type string `json:"type"` // "json_schema" or "text"
- JSONSchema *JSONSchema `json:"json_schema,omitempty"` // Only when type is "json_schema"
- }
- // JSONSchema represents the JSON Schema configuration
- type JSONSchema struct {
- Name string `json:"name"` // Schema name identifier
- Description string `json:"description,omitempty"` // Optional description
- Schema map[string]interface{} `json:"schema"` // JSON Schema object
- Strict bool `json:"strict,omitempty"` // Optional strict mode
- }
- // OutputConfig represents the output configuration (Anthropic style)
- type OutputConfig struct {
- Format *OutputFormat `json:"format,omitempty"`
- }
- // OutputFormat represents the format configuration for Anthropic
- type OutputFormat struct {
- Type string `json:"type"` // "json_schema" or "text"
- Schema map[string]interface{} `json:"schema,omitempty"` // JSON Schema object (when type is "json_schema")
- }
- // ChatCompletionRequest represents the OpenAI-compatible chat request
- type ChatCompletionRequest struct {
- Model string `json:"model"`
- Messages []ChatMessage `json:"messages"`
- Stream bool `json:"stream,omitempty"`
- Temperature *float64 `json:"temperature,omitempty"`
- MaxTokens *int `json:"max_tokens,omitempty"`
- TopP *float64 `json:"top_p,omitempty"`
- Stop []string `json:"stop,omitempty"`
- Tools []map[string]interface{} `json:"tools,omitempty"`
- APIKey string `json:"api_key,omitempty"`
- ResponseFormat *ResponseFormat `json:"response_format,omitempty"` // OpenAI style
- OutputConfig *OutputConfig `json:"output_config,omitempty"` // Anthropic style
- }
- // CacheControl represents Anthropic's cache control directive
- type CacheControl struct {
- Type string `json:"type"`
- }
- // ChatMessage represents a message in the chat
- type ChatMessage struct {
- Role string `json:"role"`
- Content string `json:"content"`
- CacheControl *CacheControl `json:"cache_control,omitempty"`
- }
- // ChatCompletionResponse represents the OpenAI-compatible chat response
- type ChatCompletionResponse struct {
- ID string `json:"id"`
- Object string `json:"object"`
- Created int64 `json:"created"`
- Model string `json:"model"`
- Choices []struct {
- Index int `json:"index"`
- Message struct {
- Role string `json:"role"`
- Content string `json:"content"`
- } `json:"message"`
- FinishReason string `json:"finish_reason"`
- } `json:"choices"`
- Usage struct {
- PromptTokens int `json:"prompt_tokens"`
- CompletionTokens int `json:"completion_tokens"`
- TotalTokens int `json:"total_tokens"`
- } `json:"usage"`
- }
- // ChatCompletionStreamResponse represents a streaming response chunk
- type ChatCompletionStreamResponse struct {
- ID string `json:"id"`
- Object string `json:"object"`
- Created int64 `json:"created"`
- Model string `json:"model"`
- Choices []struct {
- Index int `json:"index"`
- Delta struct {
- Role string `json:"role,omitempty"`
- Content string `json:"content,omitempty"`
- } `json:"delta"`
- FinishReason *string `json:"finish_reason"`
- } `json:"choices"`
- }
- // Call implements LLMAdapter interface
- func (a *OpenAIAdapter) Call(ctx context.Context, params map[string]interface{}, stream chan<- string) (map[string]interface{}, error) {
- // Build request from params
- req, err := a.buildRequest(params)
- if err != nil {
- return nil, fmt.Errorf("failed to build request: %w", err)
- }
- // Check if streaming is requested
- isStreaming := false
- if streamVal, ok := params["stream"].(bool); ok {
- isStreaming = streamVal
- }
- req.Stream = isStreaming
- // Make the request
- var result map[string]interface{}
- if isStreaming && stream != nil {
- result, err = a.callStreaming(ctx, req, stream)
- } else {
- result, err = a.callNonStreaming(ctx, req)
- }
- if err != nil {
- return nil, err
- }
- // Check if structured output (json_schema) was requested
- // Check both response_format (OpenAI) and output_config (Anthropic) styles
- shouldParseJSON := false
- // Check response_format (OpenAI style)
- if responseFormat, ok := params["response_format"].(map[string]interface{}); ok {
- if formatType, ok := responseFormat["type"].(string); ok && formatType == "json_schema" {
- shouldParseJSON = true
- }
- }
- // Check output_config (Anthropic style)
- if outputConfig, ok := params["output_config"].(map[string]interface{}); ok {
- if format, ok := outputConfig["format"].(map[string]interface{}); ok {
- if formatType, ok := format["type"].(string); ok && formatType == "json_schema" {
- shouldParseJSON = true
- }
- }
- }
- // Parse content as JSON if structured output was requested
- if shouldParseJSON {
- if content, ok := result["content"].(string); ok && content != "" {
- var parsed interface{}
- if err := json.Unmarshal([]byte(content), &parsed); err != nil {
- model, _ := result["model"].(string)
- return nil, &LLMError{
- Type: "json_parse_error",
- Code: "JSON_PARSE_ERROR",
- Message: fmt.Sprintf("failed to parse structured output as JSON: %v", err),
- Retryable: false,
- Model: model,
- }
- }
- // Replace the content string with the parsed JSON object
- result["content"] = parsed
- }
- }
- return result, nil
- }
- // buildRequest constructs a ChatCompletionRequest from params
- func (a *OpenAIAdapter) buildRequest(params map[string]interface{}) (*ChatCompletionRequest, error) {
- req := &ChatCompletionRequest{}
- // API key in request body for BYOK passthrough
- if a.requestAPIKey != "" {
- req.APIKey = a.requestAPIKey
- }
- // Model: adapter-level override takes priority, then params, then default
- // Note: In v3.9+, model should not be specified in workflow JSON - it should be
- // injected at runtime via adapter configuration (a.model). The params["model"]
- // fallback is kept for backward compatibility with v3.6/v3.7/v3.8 workflows.
- if a.model != "" {
- req.Model = a.model
- } else if model, ok := params["model"].(string); ok {
- req.Model = model
- } else {
- req.Model = "gpt-4" // default model
- }
- // Cache control: adapter-level override takes priority, then params
- cacheControl := false
- if a.cacheControl != nil {
- cacheControl = *a.cacheControl
- } else if cc, ok := params["cache_control"].(bool); ok {
- cacheControl = cc
- }
- // Messages (required)
- if messages, ok := params["messages"].([]interface{}); ok {
- for _, m := range messages {
- msg, ok := m.(map[string]interface{})
- if !ok {
- continue
- }
- chatMsg := ChatMessage{}
- if role, ok := msg["role"].(string); ok {
- chatMsg.Role = role
- }
- if content, ok := msg["content"].(string); ok {
- chatMsg.Content = content
- }
- // Apply cache_control to system messages for Anthropic models
- if cacheControl && chatMsg.Role == "system" && isAnthropicModel(req.Model) {
- chatMsg.CacheControl = &CacheControl{Type: "ephemeral"}
- }
- req.Messages = append(req.Messages, chatMsg)
- }
- }
- // Optional parameters
- if temp, ok := params["temperature"].(float64); ok {
- req.Temperature = &temp
- }
- if maxTokens, ok := params["max_tokens"].(int); ok {
- req.MaxTokens = &maxTokens
- } else if maxTokens, ok := params["max_tokens"].(float64); ok {
- mt := int(maxTokens)
- req.MaxTokens = &mt
- }
- if topP, ok := params["top_p"].(float64); ok {
- req.TopP = &topP
- }
- if stop, ok := params["stop"].([]interface{}); ok {
- for _, s := range stop {
- if str, ok := s.(string); ok {
- req.Stop = append(req.Stop, str)
- }
- }
- }
- if tools, ok := params["tools"].([]interface{}); ok {
- for _, t := range tools {
- if tool, ok := t.(map[string]interface{}); ok {
- req.Tools = append(req.Tools, tool)
- }
- }
- }
- // Handle response_format (vendor-agnostic to vendor-specific mapping)
- if responseFormat, ok := params["response_format"].(map[string]interface{}); ok {
- if err := a.applyResponseFormat(req, responseFormat); err != nil {
- return nil, fmt.Errorf("failed to apply response_format: %w", err)
- }
- }
- // Handle output_config (Anthropic style - direct passthrough)
- // This takes precedence over response_format mapping if both are present
- if outputConfig, ok := params["output_config"].(map[string]interface{}); ok {
- if err := a.applyOutputConfig(req, outputConfig); err != nil {
- return nil, fmt.Errorf("failed to apply output_config: %w", err)
- }
- }
- return req, nil
- }
- // applyResponseFormat maps vendor-agnostic response_format to vendor-specific format
- func (a *OpenAIAdapter) applyResponseFormat(req *ChatCompletionRequest, responseFormat map[string]interface{}) error {
- // Extract type field
- formatType, ok := responseFormat["type"].(string)
- if !ok {
- return fmt.Errorf("response_format.type must be a string")
- }
- // If type is "text", no need to set anything (default behavior)
- if formatType == "text" {
- return nil
- }
- // Handle "json_schema" type
- if formatType == "json_schema" {
- // Extract json_schema object
- jsonSchemaRaw, ok := responseFormat["json_schema"].(map[string]interface{})
- if !ok {
- return fmt.Errorf("response_format.json_schema must be an object when type is json_schema")
- }
- // Build JSONSchema struct
- jsonSchema := &JSONSchema{}
- if name, ok := jsonSchemaRaw["name"].(string); ok {
- jsonSchema.Name = name
- } else {
- return fmt.Errorf("response_format.json_schema.name is required")
- }
- if desc, ok := jsonSchemaRaw["description"].(string); ok {
- jsonSchema.Description = desc
- }
- if schema, ok := jsonSchemaRaw["schema"].(map[string]interface{}); ok {
- jsonSchema.Schema = schema
- } else {
- return fmt.Errorf("response_format.json_schema.schema is required")
- }
- if strict, ok := jsonSchemaRaw["strict"].(bool); ok {
- jsonSchema.Strict = strict
- }
- // Determine vendor and apply appropriate format
- if isAnthropicModel(req.Model) {
- // Anthropic uses output_config with schema directly in format
- req.OutputConfig = &OutputConfig{
- Format: &OutputFormat{
- Type: "json_schema",
- Schema: jsonSchema.Schema,
- },
- }
- } else {
- // OpenAI and others use response_format with json_schema wrapper
- req.ResponseFormat = &ResponseFormat{
- Type: "json_schema",
- JSONSchema: jsonSchema,
- }
- }
- }
- return nil
- }
- // applyOutputConfig applies output_config directly from params (Anthropic style)
- // This is a direct passthrough - the output_config structure in the workflow JSON
- // must match the target LLM vendor's API format exactly
- func (a *OpenAIAdapter) applyOutputConfig(req *ChatCompletionRequest, outputConfig map[string]interface{}) error {
- // Extract format field
- format, ok := outputConfig["format"].(map[string]interface{})
- if !ok {
- return fmt.Errorf("output_config.format must be an object")
- }
- // Extract type field
- formatType, ok := format["type"].(string)
- if !ok {
- return fmt.Errorf("output_config.format.type must be a string")
- }
- // If type is "text", no need to set anything (default behavior)
- if formatType == "text" {
- return nil
- }
- // Handle "json_schema" type
- if formatType == "json_schema" {
- // Extract schema object
- schemaRaw, ok := format["schema"].(map[string]interface{})
- if !ok {
- // Provide more detailed error information
- schemaVal, exists := format["schema"]
- if !exists {
- return fmt.Errorf("output_config.format.schema is required when type is json_schema (field is missing)")
- }
- return fmt.Errorf("output_config.format.schema must be an object when type is json_schema (got %T: %v)", schemaVal, schemaVal)
- }
- // For Anthropic models, use output_config structure with schema directly
- req.OutputConfig = &OutputConfig{
- Format: &OutputFormat{
- Type: "json_schema",
- Schema: schemaRaw,
- },
- }
- }
- return nil
- }
- // callNonStreaming makes a non-streaming request
- func (a *OpenAIAdapter) callNonStreaming(ctx context.Context, req *ChatCompletionRequest) (map[string]interface{}, error) {
- req.Stream = false
- body, err := json.Marshal(req)
- if err != nil {
- return nil, fmt.Errorf("failed to marshal request: %w", err)
- }
- httpReq, err := http.NewRequestWithContext(ctx, "POST", a.baseURL+"/chat/completions", bytes.NewReader(body))
- if err != nil {
- return nil, fmt.Errorf("failed to create request: %w", err)
- }
- httpReq.Header.Set("Content-Type", "application/json")
- if a.apiKey != "" {
- httpReq.Header.Set("Authorization", "Bearer "+a.apiKey)
- }
- resp, err := a.httpClient.Do(httpReq)
- if err != nil {
- return nil, fmt.Errorf("failed to send request: %w", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- bodyBytes, _ := io.ReadAll(resp.Body)
- return nil, parseOpenAIError(resp.StatusCode, bodyBytes, req.Model)
- }
- var chatResp ChatCompletionResponse
- if err := json.NewDecoder(resp.Body).Decode(&chatResp); err != nil {
- return nil, fmt.Errorf("failed to decode response: %w", err)
- }
- // Extract content from response
- content := ""
- if len(chatResp.Choices) > 0 {
- content = chatResp.Choices[0].Message.Content
- }
- return map[string]interface{}{
- "content": content,
- "model": chatResp.Model,
- "finish_reason": getFinishReason(chatResp.Choices),
- "response_id": chatResp.ID,
- "usage": map[string]interface{}{
- "prompt_tokens": chatResp.Usage.PromptTokens,
- "completion_tokens": chatResp.Usage.CompletionTokens,
- "total_tokens": chatResp.Usage.TotalTokens,
- },
- }, nil
- }
- // callStreaming makes a streaming request
- func (a *OpenAIAdapter) callStreaming(ctx context.Context, req *ChatCompletionRequest, stream chan<- string) (map[string]interface{}, error) {
- req.Stream = true
- body, err := json.Marshal(req)
- if err != nil {
- return nil, fmt.Errorf("failed to marshal request: %w", err)
- }
- httpReq, err := http.NewRequestWithContext(ctx, "POST", a.baseURL+"/chat/completions", bytes.NewReader(body))
- if err != nil {
- return nil, fmt.Errorf("failed to create request: %w", err)
- }
- httpReq.Header.Set("Content-Type", "application/json")
- httpReq.Header.Set("Accept", "text/event-stream")
- if a.apiKey != "" {
- httpReq.Header.Set("Authorization", "Bearer "+a.apiKey)
- }
- resp, err := a.httpClient.Do(httpReq)
- if err != nil {
- return nil, fmt.Errorf("failed to send request: %w", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- bodyBytes, _ := io.ReadAll(resp.Body)
- return nil, parseOpenAIError(resp.StatusCode, bodyBytes, req.Model)
- }
- // Process SSE stream
- var fullContent strings.Builder
- var finishReason string
- var model string
- var responseID string
- scanner := bufio.NewScanner(resp.Body)
- // Increase buffer size for large SSE chunks (1MB max)
- scanner.Buffer(make([]byte, 64*1024), 1024*1024)
- for scanner.Scan() {
- line := scanner.Text()
- // Skip empty lines and comments
- if line == "" || strings.HasPrefix(line, ":") {
- continue
- }
- // Parse SSE data
- if !strings.HasPrefix(line, "data: ") {
- continue
- }
- data := strings.TrimPrefix(line, "data: ")
- // Check for end of stream
- if data == "[DONE]" {
- break
- }
- var chunk ChatCompletionStreamResponse
- if err := json.Unmarshal([]byte(data), &chunk); err != nil {
- continue // Skip malformed chunks
- }
- if model == "" && chunk.Model != "" {
- model = chunk.Model
- }
- if responseID == "" && chunk.ID != "" {
- responseID = chunk.ID
- }
- // Extract content from chunk
- if len(chunk.Choices) > 0 {
- delta := chunk.Choices[0].Delta
- if delta.Content != "" {
- fullContent.WriteString(delta.Content)
- // Send chunk to stream channel
- select {
- case stream <- delta.Content:
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- }
- if chunk.Choices[0].FinishReason != nil {
- finishReason = *chunk.Choices[0].FinishReason
- }
- }
- }
- if err := scanner.Err(); err != nil {
- return nil, fmt.Errorf("error reading stream: %w", err)
- }
- return map[string]interface{}{
- "content": fullContent.String(),
- "model": model,
- "finish_reason": finishReason,
- "response_id": responseID,
- }, nil
- }
- // isAnthropicModel checks if the model string refers to an Anthropic model
- func isAnthropicModel(model string) bool {
- m := strings.ToLower(model)
- return strings.HasPrefix(m, "claude") ||
- strings.HasPrefix(m, "anthropic/") ||
- strings.Contains(m, "anthropic")
- }
- // parseOpenAIError constructs a structured LLMError from an HTTP error response
- func parseOpenAIError(statusCode int, body []byte, model string) *LLMError {
- llmErr := &LLMError{
- StatusCode: statusCode,
- Model: model,
- Message: string(body),
- }
- // Try to parse body as JSON for richer error info
- var errResp map[string]interface{}
- if json.Unmarshal(body, &errResp) == nil {
- llmErr.Raw = errResp
- if msg, ok := errResp["message"].(string); ok {
- llmErr.Message = msg
- }
- if errObj, ok := errResp["error"].(map[string]interface{}); ok {
- if msg, ok := errObj["message"].(string); ok {
- llmErr.Message = msg
- }
- if code, ok := errObj["code"].(string); ok {
- llmErr.Code = code
- }
- if errType, ok := errObj["type"].(string); ok {
- llmErr.ProviderError = errType
- }
- }
- }
- // Classify error type based on status code
- switch {
- case statusCode == 401 || statusCode == 403:
- llmErr.Type = "auth_error"
- llmErr.Code = "AUTH_ERROR"
- llmErr.Retryable = false
- case statusCode == 400:
- llmErr.Type = "bad_request"
- llmErr.Code = "BAD_REQUEST"
- llmErr.Retryable = false
- case statusCode == 429:
- llmErr.Type = "rate_limit"
- llmErr.Code = "RATE_LIMIT"
- llmErr.Retryable = true
- case statusCode == 408:
- llmErr.Type = "timeout"
- llmErr.Code = "TIMEOUT"
- llmErr.Retryable = true
- case statusCode >= 500:
- llmErr.Type = "service_unavailable"
- llmErr.Code = "SERVICE_UNAVAILABLE"
- llmErr.Retryable = true
- default:
- llmErr.Type = "unknown_error"
- llmErr.Code = "UNKNOWN_ERROR"
- llmErr.Retryable = false
- }
- // Refine classification based on error message content
- msgLower := strings.ToLower(llmErr.Message)
- if strings.Contains(msgLower, "context length") || strings.Contains(msgLower, "token limit") {
- llmErr.Type = "context_length_exceeded"
- llmErr.Code = "CONTEXT_LENGTH_EXCEEDED"
- llmErr.Retryable = false
- }
- if strings.Contains(msgLower, "content policy") || strings.Contains(msgLower, "content_filter") {
- llmErr.Type = "content_policy_violation"
- llmErr.Code = "CONTENT_POLICY_VIOLATION"
- llmErr.Retryable = false
- }
- // Infer provider from model name
- llmErr.Provider = inferProvider(model)
- return llmErr
- }
- func getFinishReason(choices []struct {
- Index int `json:"index"`
- Message struct {
- Role string `json:"role"`
- Content string `json:"content"`
- } `json:"message"`
- FinishReason string `json:"finish_reason"`
- }) string {
- if len(choices) > 0 {
- return choices[0].FinishReason
- }
- return ""
- }
|