package workflow import ( "archive/zip" "bytes" "context" "crypto/sha256" "encoding/hex" "encoding/json" "errors" "fmt" "io" "net/http" "path/filepath" "reflect" "strings" "sync" "sync/atomic" "time" ) // executeServiceStep executes a Service_* step func (e *Engine) executeServiceStep(ctx ContextAccessor, step *Step) error { // Extract service name from step ID (Service_xxx) serviceName := step.ID[8:] // Remove "Service_" prefix // Evaluate input parameters (with deep evaluation for nested structures) evaluator := NewExpressionEvaluator(ctx) params := make(map[string]interface{}) // Get base context for accessing fields baseCtx := ctx.GetBaseContext() for key, valueExpr := range step.In { val, err := evaluator.EvaluateDeep(valueExpr) if err != nil { return fmt.Errorf("failed to evaluate input parameter %s: %w", key, err) } params[key] = val } if baseCtx.ServiceAdapter == nil { return fmt.Errorf("service step %s: no ServiceAdapter configured", step.ID) } // Call service result, err := baseCtx.ServiceAdapter.Call(baseCtx.Ctx, serviceName, params) if err != nil { return fmt.Errorf("service call failed: %w", err) } // Store result in local variable (use ctx accessor for proper scoping in parallel mode) ctx.SetLocalVar("_result", result.Data) // Apply output mapping if len(step.Out) > 0 { if err := e.applyOutputMapping(ctx, step.Out); err != nil { return fmt.Errorf("failed to apply output mapping: %w", err) } } // Clear _result ctx.DeleteLocalVar("_result") return nil } // executeComponentStep executes a Component_* step func (e *Engine) executeComponentStep(ctx ContextAccessor, step *Step) error { // Extract component ID from step ID (Component_xxx) componentID := step.ID[10:] // Remove "Component_" prefix // Evaluate input parameters (with deep evaluation for nested structures) evaluator := NewExpressionEvaluator(ctx) params := make(map[string]interface{}) // Get base context for accessing fields baseCtx := ctx.GetBaseContext() for key, valueExpr := range step.In { val, err := evaluator.EvaluateDeep(valueExpr) if err != nil { return fmt.Errorf("failed to evaluate input parameter %s: %w", key, err) } params[key] = val } if baseCtx.ComponentAdapter == nil { return fmt.Errorf("component step %s: no ComponentAdapter configured", step.ID) } // Call component result, err := baseCtx.ComponentAdapter.Call(baseCtx.Ctx, componentID, params) if err != nil { return fmt.Errorf("component call failed: %w", err) } // Store result in local variable (use ctx accessor for proper scoping in parallel mode) ctx.SetLocalVar("_result", result) // Apply output mapping if len(step.Out) > 0 { if err := e.applyOutputMapping(ctx, step.Out); err != nil { return fmt.Errorf("failed to apply output mapping: %w", err) } } // Clear _result ctx.DeleteLocalVar("_result") return nil } // resolveSchemaRef resolves schemaRef references in output_config.format (v3.9+) func (e *Engine) resolveSchemaRef(params map[string]interface{}) error { // Check if output_config exists outputConfig, ok := params["output_config"].(map[string]interface{}) if !ok { return nil // No output_config, nothing to resolve } // Check if format exists format, ok := outputConfig["format"].(map[string]interface{}) if !ok { return nil // No format, nothing to resolve } // Check if schemaRef exists schemaRef, hasSchemaRef := format["schemaRef"].(string) _, hasSchema := format["schema"] // If both schema and schemaRef exist, that's an error if hasSchema && hasSchemaRef { return fmt.Errorf("output_config.format cannot have both 'schema' and 'schemaRef' - use one or the other") } // If no schemaRef, nothing to resolve if !hasSchemaRef { return nil } // Resolve schemaRef from registry schema, err := e.workflow.Registry.GetSchema(schemaRef) if err != nil { return fmt.Errorf("failed to resolve schemaRef %q: %w", schemaRef, err) } // Replace schemaRef with the actual schema format["schema"] = schema delete(format, "schemaRef") return nil } // getOutputFormatType returns the output format type string from params. // Checks response_format.type (OpenAI style) then output_config.format.type (Anthropic style). // Returns "" when neither is set. func getOutputFormatType(params map[string]interface{}) string { if rf, ok := params["response_format"].(map[string]interface{}); ok { if t, ok := rf["type"].(string); ok { return t } } if oc, ok := params["output_config"].(map[string]interface{}); ok { if f, ok := oc["format"].(map[string]interface{}); ok { if t, ok := f["type"].(string); ok { return t } } } return "" } // isJSONObjectOutput checks if the params request unschema'd json_object output. // Supports both response_format (OpenAI) and output_config (Anthropic) styles. func isJSONObjectOutput(params map[string]interface{}) bool { return getOutputFormatType(params) == "json_object" } // stripMarkdownCodeFence removes a ```json or ``` opening fence and its closing // ``` from s, returning the inner text trimmed of whitespace. // Returns s unchanged when no fence is detected. func stripMarkdownCodeFence(s string) string { s = strings.TrimSpace(s) for _, prefix := range []string{"```json", "```"} { if strings.HasPrefix(s, prefix) { inner := strings.TrimPrefix(s, prefix) if idx := strings.LastIndex(inner, "```"); idx != -1 { return strings.TrimSpace(inner[:idx]) } break } } return s } // unwrapJSONObject strips markdown fencing and JSON-parses the content when the // output format is json_object. Returns content unchanged for other formats or // when content is already a non-string value (adapter already parsed it). func unwrapJSONObject(content interface{}, params map[string]interface{}) interface{} { if !isJSONObjectOutput(params) { return content } s, ok := content.(string) if !ok { return content } s = stripMarkdownCodeFence(s) var parsed interface{} if err := json.Unmarshal([]byte(s), &parsed); err == nil { return parsed } return s // not valid JSON even after stripping — return stripped string } // isStructuredOutput checks if the params specify structured output (json_schema) // Supports both response_format (OpenAI) and output_config (Anthropic) styles func isStructuredOutput(params map[string]interface{}) bool { return getOutputFormatType(params) == "json_schema" } // executeLLMStep executes an LLM_* step func (e *Engine) executeLLMStep(ctx ContextAccessor, step *Step) error { // Get base context for accessing fields baseCtx := ctx.GetBaseContext() // Evaluate input parameters (with deep evaluation for nested structures) evaluator := NewExpressionEvaluator(ctx) params := make(map[string]interface{}) for key, valueExpr := range step.In { val, err := evaluator.EvaluateDeep(valueExpr) if err != nil { return fmt.Errorf("failed to evaluate input parameter %s: %w", key, err) } params[key] = val } // Resolve docs and inject into system prompt if err := e.injectDocs(baseCtx, params); err != nil { return fmt.Errorf("failed to inject docs: %w", err) } // Resolve schemaRef in output_config (v3.9+) if err := e.resolveSchemaRef(params); err != nil { return fmt.Errorf("failed to resolve schemaRef: %w", err) } // Streaming is controlled by in.stream (passed in step.In / params). isStreaming := false if streamVal, ok := params["stream"].(bool); ok && streamVal { isStreaming = true } stream := make(chan string, 10) var streamDone chan struct{} // closed when the streaming goroutine finishes // The engine owns the stream lifecycle: it always closes stream after Call // returns. Adapters must NOT close stream themselves; they only send to it. var closeOnce sync.Once closeStream := func() { closeOnce.Do(func() { close(stream) }) } if isStreaming { streamDone = make(chan struct{}) go func() { defer close(streamDone) stepID := step.ID for chunk := range stream { // Emit llm_token RunEvent (fire-and-forget, spec 3.12 §13.4) e.emitRunEvent(baseCtx, RunEventLLMToken, &stepID, map[string]interface{}{ "delta": chunk, }) } }() } else { closeStream() // Close immediately if not streaming } // v3.16+: Resolve per-node LLM adapter based on step.Model llmAdapter := baseCtx.LLMAdapter // default if step.Model != "" && baseCtx.LLMAdapterRegistry != nil { resolved, modelOverride, resolveErr := baseCtx.LLMAdapterRegistry.Resolve(step.Model) if resolveErr != nil { closeStream() return fmt.Errorf("LLM provider resolution failed for step %s: %w", step.ID, resolveErr) } llmAdapter = resolved if modelOverride != "" { params["model"] = modelOverride } } // Measure latency (adapter-agnostic timing) callStart := time.Now() // Call LLM result, err := llmAdapter.Call(baseCtx.Ctx, params, stream) latencyMs := time.Since(callStart).Milliseconds() // Always close stream after Call returns so the goroutine can finish, // even if the adapter did not close it. No-op if adapter already closed it. closeStream() // Wait for the streaming goroutine to drain all remaining chunks. if streamDone != nil { <-streamDone } if err != nil { if e.isV310OrLater() { // v3.10: Set partial _meta with whatever info we have ctx.SetLocalVar("_meta", buildMetaFromError(err, latencyMs)) } return fmt.Errorf("LLM call failed: %w", err) } // Emit llm_done RunEvent (always on success, spec 3.12 §13.4) // Order: step_start → llm_token(×N, stream only) → llm_done → step_done { stepID := step.ID llmDonePayload := map[string]interface{}{ "latency_ms": latencyMs, } if fr, ok := result["finish_reason"].(string); ok { llmDonePayload["finish_reason"] = fr } if m, ok := result["model"].(string); ok { llmDonePayload["model"] = m } if usage, ok := result["usage"].(map[string]interface{}); ok { normalized := map[string]interface{}{} if v, ok := usage["prompt_tokens"]; ok { normalized["input_tokens"] = v } if v, ok := usage["completion_tokens"]; ok { normalized["output_tokens"] = v } if v, ok := usage["total_tokens"]; ok { normalized["total_tokens"] = v } llmDonePayload["usage"] = normalized } e.emitRunEvent(baseCtx, RunEventLLMDone, &stepID, llmDonePayload) } // Version-dependent result handling if e.isV310OrLater() { // === v3.10 semantics: _result = content only, _meta = metadata === e.applyV310LLMResult(ctx, result, params, latencyMs) } else { // === v3.6-v3.9 semantics (unchanged) === if isStructuredOutput(params) { // For structured output, _result is the parsed content directly if content, ok := result["content"]; ok { ctx.SetLocalVar("_result", content) } else { ctx.SetLocalVar("_result", result) } } else { // For non-structured output, _result is the full result map (v3.6 behavior) ctx.SetLocalVar("_result", result) } } // Apply output mapping if len(step.Out) > 0 { if err := e.applyOutputMapping(ctx, step.Out); err != nil { return fmt.Errorf("failed to apply output mapping: %w", err) } } // Clear ephemeral local variables ctx.DeleteLocalVar("_result") if e.isV310OrLater() { ctx.DeleteLocalVar("_meta") } return nil } // applyV310LLMResult sets _result and _meta for v3.10 semantics func (e *Engine) applyV310LLMResult(ctx ContextAccessor, result map[string]interface{}, params map[string]interface{}, latencyMs int64) { // _result = content body only (string for text, parsed object for json_schema/json_object) if content, ok := result["content"]; ok { ctx.SetLocalVar("_result", unwrapJSONObject(content, params)) } else { ctx.SetLocalVar("_result", nil) } // _meta = call metadata meta := map[string]interface{}{ "latency_ms": latencyMs, } if model, ok := result["model"].(string); ok { meta["model"] = model meta["model_resolved"] = model meta["provider"] = inferProvider(model) } if finishReason, ok := result["finish_reason"].(string); ok { meta["finish_reason"] = finishReason } if responseID, ok := result["response_id"].(string); ok && responseID != "" { meta["response_id"] = responseID } // Build normalized usage if usage, ok := result["usage"].(map[string]interface{}); ok { normalizedUsage := map[string]interface{}{ "raw": usage, } if pt, ok := usage["prompt_tokens"]; ok { normalizedUsage["input_tokens"] = pt } if ct, ok := usage["completion_tokens"]; ok { normalizedUsage["output_tokens"] = ct } if tt, ok := usage["total_tokens"]; ok { normalizedUsage["total_tokens"] = tt } meta["usage"] = normalizedUsage } ctx.SetLocalVar("_meta", meta) } // inferProvider infers the LLM provider from the model name func inferProvider(model string) string { m := strings.ToLower(model) switch { case strings.HasPrefix(m, "claude") || strings.Contains(m, "anthropic"): return "anthropic" case strings.HasPrefix(m, "gpt") || strings.HasPrefix(m, "o1") || strings.HasPrefix(m, "o3"): return "openai" case strings.Contains(m, "gemini"): return "google" case strings.Contains(m, "mistral"): return "mistral" default: return "unknown" } } // buildMetaFromError extracts partial metadata from an LLM error func buildMetaFromError(err error, latencyMs int64) map[string]interface{} { meta := map[string]interface{}{ "latency_ms": latencyMs, } var llmErr *LLMError if errors.As(err, &llmErr) { if llmErr.Model != "" { meta["model"] = llmErr.Model meta["provider"] = inferProvider(llmErr.Model) } if llmErr.Provider != "" { meta["provider"] = llmErr.Provider } if llmErr.RequestID != "" { meta["request_id"] = llmErr.RequestID } } return meta } // injectDocs resolves doc IDs from the "docs" param and appends their content // to the last system message. If no system message exists, one is created. func (e *Engine) injectDocs(baseCtx *ExecutionContext, params map[string]interface{}) error { docIDs, ok := params["docs"].([]interface{}) if !ok || len(docIDs) == 0 { return nil } if baseCtx.DocAdapter == nil { return fmt.Errorf("docs referenced but no DocAdapter configured") } // Resolve each doc ID var docContents []string for _, raw := range docIDs { docID := fmt.Sprintf("%v", raw) // Validate doc is declared in registry if !e.workflow.Registry.HasDoc(docID) { return fmt.Errorf("doc %q not declared in registry.docs", docID) } content, err := baseCtx.DocAdapter.Get(baseCtx.Ctx, docID) if err != nil { return fmt.Errorf("failed to resolve doc %s: %w", docID, err) } desc := e.workflow.Registry.Docs[docID] docContents = append(docContents, fmt.Sprintf("[Doc %s: %s]\n%s", docID, desc, content)) } if len(docContents) == 0 { return nil } docBlock := strings.Join(docContents, "\n\n") // Find the last system message and append doc content messages, ok := params["messages"].([]interface{}) if !ok { return nil } lastSystemIdx := -1 for i, m := range messages { if msg, ok := m.(map[string]interface{}); ok { if role, _ := msg["role"].(string); role == "system" { lastSystemIdx = i } } } if lastSystemIdx >= 0 { msg := messages[lastSystemIdx].(map[string]interface{}) existing, _ := msg["content"].(string) msg["content"] = existing + "\n\n" + docBlock } else { // No system message — prepend one sysMsg := map[string]interface{}{ "role": "system", "content": docBlock, } params["messages"] = append([]interface{}{sysMsg}, messages...) } // Remove docs from params so it's not passed to the LLM adapter delete(params, "docs") return nil } // executeAPIStep executes an API_* step func (e *Engine) executeAPIStep(ctx ContextAccessor, step *Step) error { // Extract API ID from step ID (API_xxx) apiID := step.ID[4:] // Remove "API_" prefix // Get base context for accessing fields baseCtx := ctx.GetBaseContext() // Get API definition from registry apiDef, err := e.workflow.Registry.GetAPIDefinition(apiID) if err != nil { return fmt.Errorf("API definition not found: %w", err) } // Evaluate input parameters (with deep evaluation for nested structures) evaluator := NewExpressionEvaluator(ctx) params := make(map[string]interface{}) for key, valueExpr := range step.In { val, err := evaluator.EvaluateDeep(valueExpr) if err != nil { return fmt.Errorf("failed to evaluate input parameter %s: %w", key, err) } params[key] = val } // Resolve auth token if specified if apiDef.Auth != "" { var authToken interface{} var err error if strings.HasPrefix(apiDef.Auth, "=") { authToken, err = evaluator.EvaluateValue(apiDef.Auth) } else { authToken, err = evaluator.Evaluate(apiDef.Auth) } if err != nil { return fmt.Errorf("failed to resolve auth token: %w", err) } if authToken != nil { if tokenStr, ok := authToken.(string); ok { params["authToken"] = tokenStr } } } if baseCtx.APIAdapter == nil { return fmt.Errorf("API step %s: no APIAdapter configured", step.ID) } // Call API result, err := baseCtx.APIAdapter.Call(baseCtx.Ctx, apiDef, params) if err != nil { return fmt.Errorf("API call failed: %w", err) } // Store result in local variable (use ctx accessor for proper scoping in parallel mode) ctx.SetLocalVar("_result", result) // Apply output mapping if len(step.Out) > 0 { if err := e.applyOutputMapping(ctx, step.Out); err != nil { return fmt.Errorf("failed to apply output mapping: %w", err) } } // Clear _result ctx.DeleteLocalVar("_result") return nil } // executeSetStep executes a Set_* step func (e *Engine) executeSetStep(ctx ContextAccessor, step *Step) error { evaluator := NewExpressionEvaluator(ctx) // Evaluate value expression value, err := evaluator.EvaluateValue(step.Value) if err != nil { return fmt.Errorf("failed to evaluate value: %w", err) } // Set variable if err := evaluator.SetVariable(step.Target, value); err != nil { return fmt.Errorf("failed to set variable: %w", err) } return nil } // executeWriteStep executes a Write_* step func (e *Engine) executeWriteStep(ctx ContextAccessor, step *Step) error { evaluator := NewExpressionEvaluator(ctx) // Get base context for accessing fields baseCtx := ctx.GetBaseContext() // Evaluate target path targetPath, err := evaluator.EvaluateValue(step.Target) if err != nil { return fmt.Errorf("failed to evaluate target path: %w", err) } targetPathStr, ok := targetPath.(string) if !ok { return fmt.Errorf("target path must be a string, got: %T", targetPath) } // Resolve {var} interpolations (e.g. ".tmp/{_iterDir}/item.txt") targetPathStr, err = e.interpolateFilePath(ctx, targetPathStr) if err != nil { return fmt.Errorf("failed to interpolate target path: %w", err) } // Resolve .tmp/ prefix to run-isolated path before artifact check targetPathStr = e.resolveTmpPath(ctx, targetPathStr) // Check if path is allowed (check against logical/resolved path) if !e.workflow.Registry.IsArtifactPathAllowed(targetPathStr) { return fmt.Errorf("write path not allowed: %s", targetPathStr) } // Evaluate value value, err := evaluator.EvaluateValue(step.Value) if err != nil { return fmt.Errorf("failed to evaluate value: %w", err) } // Convert value to bytes var content []byte switch v := value.(type) { case string: content = []byte(v) case []byte: content = v default: // Serialize non-string values as JSON jsonBytes, err := json.Marshal(v) if err != nil { return fmt.Errorf("failed to marshal value to JSON: %w", err) } content = jsonBytes } // Determine write mode mode := WriteModeOverwrite if step.Mode != "" { mode = WriteMode(step.Mode) } if baseCtx.FileAdapter == nil { return fmt.Errorf("write step %s: no FileAdapter configured", step.ID) } // Write file if err := baseCtx.FileAdapter.Write(baseCtx.Ctx, targetPathStr, content, mode); err != nil { return fmt.Errorf("failed to write file: %w", err) } // Store artifact reference baseCtx.Artifacts[targetPathStr] = targetPathStr // Emit file_done RunEvent (spec 3.13 §13.3) { stepID := step.ID e.emitRunEvent(baseCtx, RunEventFileDone, &stepID, map[string]interface{}{ "path": strings.TrimPrefix(targetPathStr, "/"), "size_bytes": len(content), }) } return nil } // executeBranchStep executes a Branch_* step func (e *Engine) executeBranchStep(ctx ContextAccessor, step *Step) error { evaluator := NewExpressionEvaluator(ctx) // Evaluate cases to find the first matching one var selectedStepID string for _, c := range step.Cases { if len(c) != 2 { return fmt.Errorf("invalid case format: expected [expression, stepId], got %d elements", len(c)) } expression := c[0] stepID := c[1] if expression == "ELSE" { selectedStepID = stepID break } var result interface{} var err error if strings.HasPrefix(expression, "=") { result, err = evaluator.EvaluateValue(expression) } else { result, err = evaluator.Evaluate(expression) } if err != nil { return fmt.Errorf("failed to evaluate branch condition: %w", err) } if toBool(result) { selectedStepID = stepID break } } if selectedStepID == "" { // No matching case and no ELSE: skip all branch bodies, proceed to Branch_*.next. // Spec §10.10: "若无命中且无 ELSE,跳过所有分支子链,继续 Branch_*.next" return nil } // Execute the selected branch branchStep := e.findStepByID(selectedStepID) if branchStep == nil { return fmt.Errorf("branch step not found: %s", selectedStepID) } return e.executeStep(ctx, branchStep) } // executeLoopStep executes a Loop_* step, dispatching to while or source mode. func (e *Engine) executeLoopStep(ctx ContextAccessor, step *Step) error { // v3.16+: while mode (condition-based loop) if step.While != "" { return e.executeWhileLoop(ctx, step) } return e.executeSourceLoop(ctx, step) } // executeSourceLoop executes a Loop_* step with a source array. func (e *Engine) executeSourceLoop(ctx ContextAccessor, step *Step) error { evaluator := NewExpressionEvaluator(ctx) // Get loop source from step.Source property. sourceStr, ok := step.Source.(string) if !ok || sourceStr == "" { return fmt.Errorf("loop source not specified or not a string") } // Evaluate source using new expression convention (supports = prefix) // For backward compatibility: if no = prefix, treat as direct variable reference sourceExpr := sourceStr if !strings.HasPrefix(sourceExpr, "=") { // Add = prefix for backward compatibility with old format ($items) sourceExpr = "=" + sourceExpr } source, err := evaluator.EvaluateValue(sourceExpr) if err != nil { return fmt.Errorf("failed to evaluate loop source: %w", err) } // Convert source to slice sourceSlice := reflect.ValueOf(source) if sourceSlice.Kind() != reflect.Slice && sourceSlice.Kind() != reflect.Array { return fmt.Errorf("loop source must be an array, got: %T", source) } // Get loop mode mode := LoopMode(step.Mode) if mode != LoopModeParallel && mode != LoopModeSerial { return fmt.Errorf("invalid loop mode: %s", step.Mode) } // Execute loop iterations length := sourceSlice.Len() // v3.16+: cap at min(len(source), maxIterations) if maxIterations is set if step.MaxIterations != nil && *step.MaxIterations < length { length = *step.MaxIterations } if mode == LoopModeSerial { // Serial execution for i := 0; i < length; i++ { item := sourceSlice.Index(i).Interface() if err := e.executeLoopIteration(ctx, step, item, i); err != nil { // v3.16+: BREAK exits the loop cleanly (not an error) if IsBreakError(err) { return nil } return fmt.Errorf("loop iteration %d failed: %w", i, err) } } } else { // Parallel execution with proper synchronization if err := e.executeLoopParallel(ctx, step, sourceSlice, length); err != nil { return err } } return nil } // executeWhileLoop executes a Loop_* step with a while condition (v3.16+). // While mode is always serial. _item is not available; _index and _iterDir are. func (e *Engine) executeWhileLoop(ctx ContextAccessor, step *Step) error { maxIter := *step.MaxIterations // guaranteed non-nil by validation for i := 0; i < maxIter; i++ { // Evaluate while condition BEFORE each iteration (including iteration 0) evaluator := NewExpressionEvaluator(ctx) whileExpr := step.While var condResult interface{} var err error if strings.HasPrefix(whileExpr, "=") { condResult, err = evaluator.EvaluateValue(whileExpr) } else { condResult, err = evaluator.Evaluate(whileExpr) } if err != nil { return fmt.Errorf("failed to evaluate while condition: %w", err) } if !toBool(condResult) { break // Condition false, exit loop } // Set loop local variables: _index and _iterDir (no _item for while loops) ctx.SetLocalVar("_index", i) ctx.SetLocalVar("_iterDir", fmt.Sprintf("%s_%d", step.ID, i)) // Execute children var childErr error for _, childID := range step.Children { child := e.findStepByID(childID) if child == nil { childErr = fmt.Errorf("loop child step not found: %s", childID) break } if err := e.executeStep(ctx, child); err != nil { childErr = err break } } // Cleanup locals ctx.DeleteLocalVar("_index") ctx.DeleteLocalVar("_iterDir") if childErr != nil { // v3.16+: BREAK exits the loop cleanly if IsBreakError(childErr) { return nil } return fmt.Errorf("while loop iteration %d failed: %w", i, childErr) } } return nil } // executeLoopParallel executes loop iterations in parallel func (e *Engine) executeLoopParallel(ctx ContextAccessor, loopStep *Step, sourceSlice reflect.Value, length int) error { // Get base context for accessing fields baseCtx := ctx.GetBaseContext() // Wrap execution context with thread-safe accessor safeCtx := NewSafeExecutionContext(baseCtx) // Get parallel executor executor := e.getParallelExecutor() // v3.16+: For BREAK support in parallel, use a shared atomic flag. // When any iteration triggers BREAK, the flag is set. Subsequent iterations // check the flag before starting and skip. Already-running iterations complete normally. var breakRequested int32 // atomic: 0=no, 1=yes // Create branches for each iteration branches := make([]ParallelBranch, length) for i := 0; i < length; i++ { item := sourceSlice.Index(i).Interface() index := i branches[i] = ParallelBranch{ ID: fmt.Sprintf("%s[%d]", loopStep.ID, index), Fn: func(branchCtx context.Context) error { // v3.16+: Check if BREAK was already requested by another iteration if atomic.LoadInt32(&breakRequested) != 0 { return nil // Skip this iteration } // Create child context with isolated local vars childCtx := NewChildExecutionContext(safeCtx) childCtx.SetLocalVar("_item", item) childCtx.SetLocalVar("_index", index) // v3.14+: inject _iterDir for per-iteration .tmp/ isolation childCtx.SetLocalVar("_iterDir", fmt.Sprintf("%s_%d", loopStep.ID, index)) // Execute children in child context for _, childID := range loopStep.Children { // Check cancellation before each child select { case <-branchCtx.Done(): return branchCtx.Err() default: } child := e.findStepByID(childID) if child == nil { return fmt.Errorf("loop child step not found: %s", childID) } // Execute with child context - works because executeStep accepts ContextAccessor if err := e.executeStep(childCtx, child); err != nil { // v3.16+: BREAK signals loop exit — set flag and return cleanly if IsBreakError(err) { atomic.StoreInt32(&breakRequested, 1) return nil // This iteration ends, others complete naturally } return fmt.Errorf("iteration %d: %w", index, err) } } return nil }, } } // Execute all branches in parallel if err := executor.Execute(baseCtx.Ctx, branches, e.getErrorStrategy()); err != nil { return fmt.Errorf("parallel loop execution failed: %w", err) } return nil } // executeLoopIteration executes a single loop iteration func (e *Engine) executeLoopIteration(ctx ContextAccessor, loopStep *Step, item interface{}, index int) error { // Set loop local variables (use ctx accessor for proper scoping) ctx.SetLocalVar("_item", item) ctx.SetLocalVar("_index", index) // v3.14+: inject _iterDir for per-iteration .tmp/ isolation ctx.SetLocalVar("_iterDir", fmt.Sprintf("%s_%d", loopStep.ID, index)) defer ctx.DeleteLocalVar("_item") defer ctx.DeleteLocalVar("_index") defer ctx.DeleteLocalVar("_iterDir") // Execute children for _, childID := range loopStep.Children { child := e.findStepByID(childID) if child == nil { return fmt.Errorf("loop child step not found: %s", childID) } if err := e.executeStep(ctx, child); err != nil { return err } } return nil } // applyOutputMapping applies the output mapping from _result to global variables and files // Keys starting with $ write to variables, keys starting with / write to files. func (e *Engine) applyOutputMapping(ctx ContextAccessor, outMapping StepOutput) error { evaluator := NewExpressionEvaluator(ctx) // Get base context for file adapter and RunEvent emission baseCtx := ctx.GetBaseContext() for target, valueExpr := range outMapping { // Evaluate the value expression value, err := evaluator.EvaluateValue(valueExpr) if err != nil { return fmt.Errorf("failed to evaluate output expression for %s: %w", target, err) } // Determine if this is a file write or variable write if strings.HasPrefix(target, "/") { // File write - interpolate path variables filePath, err := e.interpolateFilePath(ctx, target) if err != nil { return fmt.Errorf("failed to interpolate file path %s: %w", target, err) } // Convert value to bytes var content []byte switch v := value.(type) { case string: content = []byte(v) case []byte: content = v default: // Serialize non-string values as JSON jsonBytes, err := json.Marshal(v) if err != nil { return fmt.Errorf("failed to marshal value to JSON: %w", err) } content = jsonBytes } if baseCtx.FileAdapter == nil { return fmt.Errorf("output mapping: no FileAdapter configured for file target %s", target) } // Write file if err := baseCtx.FileAdapter.Write(baseCtx.Ctx, filePath, content, WriteModeOverwrite); err != nil { return fmt.Errorf("failed to write output file %s: %w", filePath, err) } // Store artifact reference baseCtx.Artifacts[filePath] = filePath // Emit file_done RunEvent (spec 3.13 §13.3) // Order: llm_done → file_done(×N) → step_done { stepID := baseCtx.CurrentStepID e.emitRunEvent(baseCtx, RunEventFileDone, &stepID, map[string]interface{}{ "path": strings.TrimPrefix(filePath, "/"), "size_bytes": len(content), }) } } else if strings.HasPrefix(target, "$") { // Variable write - may include path with _index interpolation resolvedTarget, err := e.interpolateVarPath(ctx, target) if err != nil { return fmt.Errorf("failed to interpolate var path %s: %w", target, err) } // Set the variable if err := evaluator.SetVariable(resolvedTarget, value); err != nil { return fmt.Errorf("failed to set output variable %s: %w", resolvedTarget, err) } } else { return fmt.Errorf("invalid output target %s: must start with $ (variable) or / (file)", target) } } return nil } // interpolateFilePath interpolates variables in file paths using {var} syntax // Example: "/vsc/{_item.path}" with _item.path = "files/fileA.ts" -> "/vsc/files/fileA.ts" // Each {expr} is evaluated exactly once; substituted values are not re-scanned. func (e *Engine) interpolateFilePath(ctx ContextAccessor, path string) (string, error) { evaluator := NewExpressionEvaluator(ctx) var sb strings.Builder remaining := path for { start := strings.Index(remaining, "{") if start == -1 { sb.WriteString(remaining) break } end := strings.Index(remaining[start:], "}") if end == -1 { sb.WriteString(remaining) break } end += start // Write the literal prefix before the expression sb.WriteString(remaining[:start]) // Evaluate the expression inside { } expr := remaining[start+1 : end] val, err := evaluator.Evaluate(expr) if err != nil { return "", fmt.Errorf("failed to evaluate path expression %s: %w", expr, err) } sb.WriteString(fmt.Sprintf("%v", val)) // Advance past the closing brace; substituted value is not re-scanned remaining = remaining[end+1:] } return sb.String(), nil } // interpolateVarPath interpolates _index in variable paths // Example: "$generated[_index].name" with _index = 2 -> "$generated[2].name" func (e *Engine) interpolateVarPath(ctx ContextAccessor, path string) (string, error) { // Check if path contains _index if !strings.Contains(path, "_index") { return path, nil } // Get _index value indexVal, ok := ctx.GetLocalVar("_index") if !ok { return path, nil // No _index, return as-is } // Replace _index with actual value return strings.ReplaceAll(path, "_index", fmt.Sprintf("%v", indexVal)), nil } // resolveTmpPath rewrites .tmp/xxx paths to .tmp/{runID}/xxx for run-level isolation (v3.14+). // Paths that do not start with ".tmp/" are returned unchanged. func (e *Engine) resolveTmpPath(ctx ContextAccessor, path string) string { if !strings.HasPrefix(path, ".tmp/") { return path } baseCtx := ctx.GetBaseContext() runID := baseCtx.WorkflowID rest := strings.TrimPrefix(path, ".tmp/") return ".tmp/" + runID + "/" + rest } // resolveStepFilePath evaluates a file path expression and applies {var} interpolation // and .tmp/ run-isolation in one step. Used by Download_* and Unzip_*. func (e *Engine) resolveStepFilePath(ctx ContextAccessor, raw string) (string, error) { evaluator := NewExpressionEvaluator(ctx) // Step 1: handle = expressions ("=$var", "=\"literal\"", etc.) val, err := evaluator.EvaluateValue(raw) if err != nil { return "", fmt.Errorf("failed to evaluate path expression: %w", err) } pathStr, ok := val.(string) if !ok { return "", fmt.Errorf("path expression must yield a string, got %T", val) } // Step 2: resolve {var} interpolations (e.g. ".tmp/{_iterDir}/bundle.zip") pathStr, err = e.interpolateFilePath(ctx, pathStr) if err != nil { return "", fmt.Errorf("failed to interpolate path: %w", err) } // Step 3: rewrite .tmp/ prefix for run isolation return e.resolveTmpPath(ctx, pathStr), nil } // routePathByExt maps a filename to a target directory using the routeByExt table. // It returns defaultDir when no extension matches. An empty defaultDir is allowed // (caller should treat it as "discard / skip" if needed). func routePathByExt(name string, routeByExt map[string]string, defaultDir string) string { ext := strings.ToLower(filepath.Ext(name)) if dir, ok := routeByExt[ext]; ok { return dir } return defaultDir } // zipSlipSafe returns true if the entry path is safe (no path traversal). func zipSlipSafe(entryName string) bool { if strings.HasPrefix(entryName, "/") { return false } // filepath.Clean normalises ".." components; if the result escapes the root it's unsafe cleaned := filepath.Clean(entryName) if strings.HasPrefix(cleaned, "..") { return false } // Reject absolute paths on any OS (Windows drive letters: "C:\") if filepath.IsAbs(cleaned) { return false } return true } // executeDownloadStep executes a Download_* step (v3.14+). // It downloads a single file from an external URL and writes it to the artifact space, // either to an explicit target path or to a directory chosen by routeByExt. func (e *Engine) executeDownloadStep(ctx ContextAccessor, step *Step) error { baseCtx := ctx.GetBaseContext() evaluator := NewExpressionEvaluator(ctx) if baseCtx.FileAdapter == nil { return fmt.Errorf("download step %s: no FileAdapter configured", step.ID) } // --- Resolve source --- var downloadURL string var extraHeaders map[string]string sourceVal, err := evaluator.EvaluateValue(step.Source) if err != nil { return fmt.Errorf("download step %s: failed to evaluate source: %w", step.ID, err) } switch sv := sourceVal.(type) { case string: downloadURL = sv case map[string]interface{}: // Object form: {url, headers, auth, timeout, checksum} urlVal, ok := sv["url"] if !ok { return fmt.Errorf("download step %s: source object missing 'url' field", step.ID) } downloadURL, ok = urlVal.(string) if !ok { return fmt.Errorf("download step %s: source.url must be a string", step.ID) } if hdrs, ok := sv["headers"]; ok { if hdrMap, ok := hdrs.(map[string]interface{}); ok { extraHeaders = make(map[string]string, len(hdrMap)) for k, v := range hdrMap { extraHeaders[k] = fmt.Sprintf("%v", v) } } } default: return fmt.Errorf("download step %s: source must be a URL string or object, got %T", step.ID, sourceVal) } if downloadURL == "" { return fmt.Errorf("download step %s: download URL is empty", step.ID) } // --- HTTP download (streaming into memory buffer) --- httpCtx, cancel := context.WithTimeout(baseCtx.Ctx, 5*time.Minute) defer cancel() req, err := http.NewRequestWithContext(httpCtx, http.MethodGet, downloadURL, nil) if err != nil { return fmt.Errorf("download step %s: failed to build HTTP request: %w", step.ID, err) } for k, v := range extraHeaders { req.Header.Set(k, v) } resp, err := http.DefaultClient.Do(req) if err != nil { return fmt.Errorf("download step %s: HTTP request failed: %w", step.ID, err) } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { return fmt.Errorf("download step %s: server returned HTTP %d", step.ID, resp.StatusCode) } content, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("download step %s: failed to read response body: %w", step.ID, err) } // --- Determine target path(s) --- // writtenPath captures the resolved artifact path for _result output. var writtenPath string writeFile := func(targetPath string, data []byte) error { // Resolve {var} interpolation and .tmp/ isolation resolved, err := e.resolveStepFilePath(ctx, targetPath) if err != nil { return err } if !e.workflow.Registry.IsArtifactPathAllowed(resolved) { return fmt.Errorf("download target path not allowed: %s", resolved) } if err := baseCtx.FileAdapter.Write(baseCtx.Ctx, resolved, data, WriteModeOverwrite); err != nil { return fmt.Errorf("failed to write download to %s: %w", resolved, err) } writtenPath = resolved baseCtx.Artifacts[resolved] = resolved stepID := step.ID e.emitRunEvent(baseCtx, RunEventFileDone, &stepID, map[string]interface{}{ "path": resolved, "size_bytes": len(data), }) return nil } var writeErr error if step.Target != "" { // Single explicit target writeErr = writeFile(step.Target, content) } else if len(step.RouteByExt) > 0 || step.DefaultDir != "" { // Route by extension: derive filename from URL urlPath := downloadURL if idx := strings.Index(urlPath, "?"); idx >= 0 { urlPath = urlPath[:idx] } filename := filepath.Base(urlPath) if filename == "" || filename == "." { filename = "download" } dir := routePathByExt(filename, step.RouteByExt, step.DefaultDir) if dir != "" { targetPath := strings.TrimSuffix(dir, "/") + "/" + filename writeErr = writeFile(targetPath, content) } // dir == "": no matching extension and no defaultDir → skip (not an error) } else { return fmt.Errorf("download step %s: must specify either 'target' or 'routeByExt'", step.ID) } if writeErr != nil { return writeErr } // Set _result and apply out mapping (same pattern as Service_*/Component_* steps) ctx.SetLocalVar("_result", map[string]interface{}{ "path": writtenPath, }) if len(step.Out) > 0 { if err := e.applyOutputMapping(ctx, step.Out); err != nil { ctx.DeleteLocalVar("_result") return fmt.Errorf("download step %s: failed to apply output mapping: %w", step.ID, err) } } ctx.DeleteLocalVar("_result") return nil } // executeUnzipStep executes an Unzip_* step (v3.14+). // It reads a zip archive from the artifact space, extracts each entry, and writes // the contents to directories determined by routeByExt (mandatory) / defaultDir (optional). // zip-slip entries (path traversal) are rejected. func (e *Engine) executeUnzipStep(ctx ContextAccessor, step *Step) error { baseCtx := ctx.GetBaseContext() evaluator := NewExpressionEvaluator(ctx) if baseCtx.FileAdapter == nil { return fmt.Errorf("unzip step %s: no FileAdapter configured", step.ID) } // --- Resolve source path --- sourceVal, err := evaluator.EvaluateValue(step.Source) if err != nil { return fmt.Errorf("unzip step %s: failed to evaluate source: %w", step.ID, err) } sourceStr, ok := sourceVal.(string) if !ok { return fmt.Errorf("unzip step %s: source must be a string path expression, got %T", step.ID, sourceVal) } sourcePath, err := e.resolveStepFilePath(ctx, sourceStr) if err != nil { return fmt.Errorf("unzip step %s: failed to resolve source path: %w", step.ID, err) } // --- Read the zip file --- zipBytes, err := baseCtx.FileAdapter.Read(baseCtx.Ctx, sourcePath) if err != nil { return fmt.Errorf("unzip step %s: failed to read zip file %s: %w", step.ID, sourcePath, err) } // --- Open zip reader --- zr, err := zip.NewReader(bytes.NewReader(zipBytes), int64(len(zipBytes))) if err != nil { return fmt.Errorf("unzip step %s: failed to open zip archive: %w", step.ID, err) } // --- Determine overwrite mode --- overwrite := true if step.Overwrite != nil { overwrite = *step.Overwrite } writeMode := WriteModeOverwrite if !overwrite { writeMode = WriteModeFailIfExists } // --- Extract entries --- var extractedPaths []string for _, entry := range zr.File { if entry.FileInfo().IsDir() { continue // directories are created implicitly } entryName := entry.Name // zip-slip protection if !zipSlipSafe(entryName) { return fmt.Errorf("unzip step %s: unsafe entry path rejected: %s", step.ID, entryName) } // Route by extension dir := routePathByExt(entryName, step.RouteByExt, step.DefaultDir) if dir == "" { continue // no matching rule and no defaultDir: skip } baseName := filepath.Base(entryName) targetPath := strings.TrimSuffix(dir, "/") + "/" + baseName // Resolve .tmp/ isolation resolvedPath := e.resolveTmpPath(ctx, targetPath) if !e.workflow.Registry.IsArtifactPathAllowed(resolvedPath) { return fmt.Errorf("unzip step %s: target path not allowed: %s", step.ID, resolvedPath) } // Read entry content rc, err := entry.Open() if err != nil { return fmt.Errorf("unzip step %s: failed to open entry %s: %w", step.ID, entryName, err) } entryBytes, err := io.ReadAll(rc) rc.Close() if err != nil { return fmt.Errorf("unzip step %s: failed to read entry %s: %w", step.ID, entryName, err) } // Write to FileAdapter if err := baseCtx.FileAdapter.Write(baseCtx.Ctx, resolvedPath, entryBytes, writeMode); err != nil { return fmt.Errorf("unzip step %s: failed to write %s: %w", step.ID, resolvedPath, err) } baseCtx.Artifacts[resolvedPath] = resolvedPath stepID := step.ID e.emitRunEvent(baseCtx, RunEventFileDone, &stepID, map[string]interface{}{ "path": resolvedPath, "size_bytes": len(entryBytes), }) extractedPaths = append(extractedPaths, resolvedPath) } // Set _result to extraction summary, then apply out mapping (same pattern as other steps) ctx.SetLocalVar("_result", map[string]interface{}{ "count": len(extractedPaths), "files": extractedPaths, }) if len(step.Out) > 0 { if err := e.applyOutputMapping(ctx, step.Out); err != nil { ctx.DeleteLocalVar("_result") return fmt.Errorf("unzip step %s: failed to apply output mapping: %w", step.ID, err) } } ctx.DeleteLocalVar("_result") return nil } // executePauseStep implements the Pause_* node (v3.15+). // It blocks the workflow goroutine until the workflow is resumed via Engine.Resume, // times out (if timeout is configured), or the context is cancelled. // // Unlike other step executors, executePauseStep is responsible for emitting step_done // and routing to step.Next after a successful resume (similar to how Stop_* handles its // own terminal routing). The caller (executeStep) returns early on nil to skip the // standard post-switch step_done / next logic. func (e *Engine) executePauseStep(ctx ContextAccessor, step *Step, stepStartTime time.Time, stepTypePattern string) error { baseCtx := ctx.GetBaseContext() stepID := step.ID // Generate a unique wait token for this pause instance. // The token is stored and published as a SHA-256 hex digest so that the // persisted "waitToken (hash)" matches the spec §11.3.1 requirement. // Callers obtain the token from the pause_start RunEvent and must supply // the same value in ResumeRequest.Token. rawToken := fmt.Sprintf("wt_%s_%s_%d", baseCtx.WorkflowID, step.ID, time.Now().UnixNano()) h := sha256.Sum256([]byte(rawToken)) token := hex.EncodeToString(h[:]) // Initialise PauseState and publish it so Resume() can find it. state := &PauseState{ ch: make(chan resumeSignal, 1), token: token, nodeID: step.ID, seenRequestIDs: make(map[string]bool), } baseCtx.pauseMu.Lock() baseCtx.PauseState = state baseCtx.pauseMu.Unlock() // Calculate optional expiry timestamp for the pause_start payload. var expireAtStr string if step.Timeout != nil { expireAt := time.Now().Add(time.Duration(step.Timeout.Sec) * time.Second) expireAtStr = expireAt.UTC().Format("2006-01-02T15:04:05.000Z07:00") } // Set workflow status to paused. ctx.SetStatus(StatusPaused) // Emit pause_start. pausePayload := map[string]interface{}{ "nodeId": stepID, "waitToken": token, "resumeResultTarget": step.ResumeResultTarget, } if step.Reason != "" { pausePayload["reason"] = step.Reason } if expireAtStr != "" { pausePayload["expireAt"] = expireAtStr } e.emitRunEvent(baseCtx, RunEventPauseStart, &stepID, pausePayload) // Block waiting for resume signal, timeout, or context cancellation. var sig resumeSignal var timedOut bool if step.Timeout != nil { timer := time.NewTimer(time.Duration(step.Timeout.Sec) * time.Second) defer timer.Stop() select { case sig = <-state.ch: // Resume signal received. case <-timer.C: timedOut = true case <-baseCtx.Ctx.Done(): ctx.SetStatus(StatusFailed) return baseCtx.Ctx.Err() } } else { select { case sig = <-state.ch: // Resume signal received. case <-baseCtx.Ctx.Done(): ctx.SetStatus(StatusFailed) return baseCtx.Ctx.Err() } } if timedOut { // Emit pause_timeout. e.emitRunEvent(baseCtx, RunEventPauseTimeout, &stepID, map[string]interface{}{ "nodeId": stepID, "expiredAt": time.Now().UTC().Format("2006-01-02T15:04:05.000Z07:00"), "timeoutAction": step.Timeout.On, }) // Resume execution at the timeout handler. // Status is set back to running; the timeout step (typically Stop_* or an // error-handler chain) is responsible for further status transitions. ctx.SetStatus(StatusRunning) timeoutStep := e.findStepByID(step.Timeout.On) if timeoutStep == nil { return fmt.Errorf("Pause_* step %s: timeout.on step %q not found", step.ID, step.Timeout.On) } return e.executeStep(ctx, timeoutStep) } // --- Resume path --- // Write the resume payload to resumeResultTarget in $vars. evaluator := NewExpressionEvaluator(ctx) if err := evaluator.SetVariable(step.ResumeResultTarget, sig.Payload); err != nil { return fmt.Errorf("Pause_* step %s: failed to write resume payload to %q: %w", step.ID, step.ResumeResultTarget, err) } // Emit pause_resumed. e.emitRunEvent(baseCtx, RunEventPauseResumed, &stepID, map[string]interface{}{ "nodeId": stepID, "requestId": sig.RequestID, "resumedAt": time.Now().UTC().Format("2006-01-02T15:04:05.000Z07:00"), }) // Restore running status. ctx.SetStatus(StatusRunning) // Note: Pause_* does NOT support the `print` field (spec §5.1: 不适用 for Pause_*). // step_print is intentionally omitted here. // Emit step_done for the Pause_* node (successful resume path only). e.emitRunEvent(baseCtx, RunEventStepDone, &stepID, map[string]interface{}{ "step_type": stepTypePattern, "duration_ms": time.Since(stepStartTime).Milliseconds(), }) // Continue to the next step. if step.Next == "" { return fmt.Errorf("Pause_* step %s is missing required 'next' field", step.ID) } if step.Next == "RETURN" { return nil } nextStep := e.findStepByID(step.Next) if nextStep == nil { return fmt.Errorf("next step not found: %s (referenced by Pause_* step %s)", step.Next, step.ID) } return e.executeStep(ctx, nextStep) }