| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595 |
- package workflow
- import (
- "archive/zip"
- "bytes"
- "context"
- "crypto/sha256"
- "encoding/hex"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net/http"
- "path/filepath"
- "reflect"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- )
- // executeServiceStep executes a Service_* step
- func (e *Engine) executeServiceStep(ctx ContextAccessor, step *Step) error {
- // Extract service name from step ID (Service_xxx)
- serviceName := step.ID[8:] // Remove "Service_" prefix
- // Evaluate input parameters (with deep evaluation for nested structures)
- evaluator := NewExpressionEvaluator(ctx)
- params := make(map[string]interface{})
- // Get base context for accessing fields
- baseCtx := ctx.GetBaseContext()
- for key, valueExpr := range step.In {
- val, err := evaluator.EvaluateDeep(valueExpr)
- if err != nil {
- return fmt.Errorf("failed to evaluate input parameter %s: %w", key, err)
- }
- params[key] = val
- }
- if baseCtx.ServiceAdapter == nil {
- return fmt.Errorf("service step %s: no ServiceAdapter configured", step.ID)
- }
- // Call service
- result, err := baseCtx.ServiceAdapter.Call(baseCtx.Ctx, serviceName, params)
- if err != nil {
- return fmt.Errorf("service call failed: %w", err)
- }
- // Store result in local variable (use ctx accessor for proper scoping in parallel mode)
- ctx.SetLocalVar("_result", result.Data)
- // Apply output mapping
- if len(step.Out) > 0 {
- if err := e.applyOutputMapping(ctx, step.Out); err != nil {
- return fmt.Errorf("failed to apply output mapping: %w", err)
- }
- }
- // Clear _result
- ctx.DeleteLocalVar("_result")
- return nil
- }
- // executeComponentStep executes a Component_* step
- func (e *Engine) executeComponentStep(ctx ContextAccessor, step *Step) error {
- // Extract component ID from step ID (Component_xxx)
- componentID := step.ID[10:] // Remove "Component_" prefix
- // Evaluate input parameters (with deep evaluation for nested structures)
- evaluator := NewExpressionEvaluator(ctx)
- params := make(map[string]interface{})
- // Get base context for accessing fields
- baseCtx := ctx.GetBaseContext()
- for key, valueExpr := range step.In {
- val, err := evaluator.EvaluateDeep(valueExpr)
- if err != nil {
- return fmt.Errorf("failed to evaluate input parameter %s: %w", key, err)
- }
- params[key] = val
- }
- if baseCtx.ComponentAdapter == nil {
- return fmt.Errorf("component step %s: no ComponentAdapter configured", step.ID)
- }
- // Call component
- result, err := baseCtx.ComponentAdapter.Call(baseCtx.Ctx, componentID, params)
- if err != nil {
- return fmt.Errorf("component call failed: %w", err)
- }
- // Store result in local variable (use ctx accessor for proper scoping in parallel mode)
- ctx.SetLocalVar("_result", result)
- // Apply output mapping
- if len(step.Out) > 0 {
- if err := e.applyOutputMapping(ctx, step.Out); err != nil {
- return fmt.Errorf("failed to apply output mapping: %w", err)
- }
- }
- // Clear _result
- ctx.DeleteLocalVar("_result")
- return nil
- }
- // resolveSchemaRef resolves schemaRef references in output_config.format (v3.9+)
- func (e *Engine) resolveSchemaRef(params map[string]interface{}) error {
- // Check if output_config exists
- outputConfig, ok := params["output_config"].(map[string]interface{})
- if !ok {
- return nil // No output_config, nothing to resolve
- }
- // Check if format exists
- format, ok := outputConfig["format"].(map[string]interface{})
- if !ok {
- return nil // No format, nothing to resolve
- }
- // Check if schemaRef exists
- schemaRef, hasSchemaRef := format["schemaRef"].(string)
- _, hasSchema := format["schema"]
- // If both schema and schemaRef exist, that's an error
- if hasSchema && hasSchemaRef {
- return fmt.Errorf("output_config.format cannot have both 'schema' and 'schemaRef' - use one or the other")
- }
- // If no schemaRef, nothing to resolve
- if !hasSchemaRef {
- return nil
- }
- // Resolve schemaRef from registry
- schema, err := e.workflow.Registry.GetSchema(schemaRef)
- if err != nil {
- return fmt.Errorf("failed to resolve schemaRef %q: %w", schemaRef, err)
- }
- // Replace schemaRef with the actual schema
- format["schema"] = schema
- delete(format, "schemaRef")
- return nil
- }
- // getOutputFormatType returns the output format type string from params.
- // Checks response_format.type (OpenAI style) then output_config.format.type (Anthropic style).
- // Returns "" when neither is set.
- func getOutputFormatType(params map[string]interface{}) string {
- if rf, ok := params["response_format"].(map[string]interface{}); ok {
- if t, ok := rf["type"].(string); ok {
- return t
- }
- }
- if oc, ok := params["output_config"].(map[string]interface{}); ok {
- if f, ok := oc["format"].(map[string]interface{}); ok {
- if t, ok := f["type"].(string); ok {
- return t
- }
- }
- }
- return ""
- }
- // isJSONObjectOutput checks if the params request unschema'd json_object output.
- // Supports both response_format (OpenAI) and output_config (Anthropic) styles.
- func isJSONObjectOutput(params map[string]interface{}) bool {
- return getOutputFormatType(params) == "json_object"
- }
- // stripMarkdownCodeFence removes a ```json or ``` opening fence and its closing
- // ``` from s, returning the inner text trimmed of whitespace.
- // Returns s unchanged when no fence is detected.
- func stripMarkdownCodeFence(s string) string {
- s = strings.TrimSpace(s)
- for _, prefix := range []string{"```json", "```"} {
- if strings.HasPrefix(s, prefix) {
- inner := strings.TrimPrefix(s, prefix)
- if idx := strings.LastIndex(inner, "```"); idx != -1 {
- return strings.TrimSpace(inner[:idx])
- }
- break
- }
- }
- return s
- }
- // unwrapJSONObject strips markdown fencing and JSON-parses the content when the
- // output format is json_object. Returns content unchanged for other formats or
- // when content is already a non-string value (adapter already parsed it).
- func unwrapJSONObject(content interface{}, params map[string]interface{}) interface{} {
- if !isJSONObjectOutput(params) {
- return content
- }
- s, ok := content.(string)
- if !ok {
- return content
- }
- s = stripMarkdownCodeFence(s)
- var parsed interface{}
- if err := json.Unmarshal([]byte(s), &parsed); err == nil {
- return parsed
- }
- return s // not valid JSON even after stripping — return stripped string
- }
- // isStructuredOutput checks if the params specify structured output (json_schema)
- // Supports both response_format (OpenAI) and output_config (Anthropic) styles
- func isStructuredOutput(params map[string]interface{}) bool {
- return getOutputFormatType(params) == "json_schema"
- }
- // executeLLMStep executes an LLM_* step
- func (e *Engine) executeLLMStep(ctx ContextAccessor, step *Step) error {
- // Get base context for accessing fields
- baseCtx := ctx.GetBaseContext()
- // Evaluate input parameters (with deep evaluation for nested structures)
- evaluator := NewExpressionEvaluator(ctx)
- params := make(map[string]interface{})
- for key, valueExpr := range step.In {
- val, err := evaluator.EvaluateDeep(valueExpr)
- if err != nil {
- return fmt.Errorf("failed to evaluate input parameter %s: %w", key, err)
- }
- params[key] = val
- }
- // Resolve docs and inject into system prompt
- if err := e.injectDocs(baseCtx, params); err != nil {
- return fmt.Errorf("failed to inject docs: %w", err)
- }
- // Resolve schemaRef in output_config (v3.9+)
- if err := e.resolveSchemaRef(params); err != nil {
- return fmt.Errorf("failed to resolve schemaRef: %w", err)
- }
- // Streaming is controlled by in.stream (passed in step.In / params).
- isStreaming := false
- if streamVal, ok := params["stream"].(bool); ok && streamVal {
- isStreaming = true
- }
- stream := make(chan string, 10)
- var streamDone chan struct{} // closed when the streaming goroutine finishes
- // The engine owns the stream lifecycle: it always closes stream after Call
- // returns. Adapters must NOT close stream themselves; they only send to it.
- var closeOnce sync.Once
- closeStream := func() { closeOnce.Do(func() { close(stream) }) }
- if isStreaming {
- streamDone = make(chan struct{})
- go func() {
- defer close(streamDone)
- stepID := step.ID
- for chunk := range stream {
- // Emit llm_token RunEvent (fire-and-forget, spec 3.12 §13.4)
- e.emitRunEvent(baseCtx, RunEventLLMToken, &stepID, map[string]interface{}{
- "delta": chunk,
- })
- }
- }()
- } else {
- closeStream() // Close immediately if not streaming
- }
- // v3.16+: Resolve per-node LLM adapter based on step.Model
- llmAdapter := baseCtx.LLMAdapter // default
- if step.Model != "" && baseCtx.LLMAdapterRegistry != nil {
- resolved, modelOverride, resolveErr := baseCtx.LLMAdapterRegistry.Resolve(step.Model)
- if resolveErr != nil {
- closeStream()
- return fmt.Errorf("LLM provider resolution failed for step %s: %w", step.ID, resolveErr)
- }
- llmAdapter = resolved
- if modelOverride != "" {
- params["model"] = modelOverride
- }
- }
- // Measure latency (adapter-agnostic timing)
- callStart := time.Now()
- // Call LLM
- result, err := llmAdapter.Call(baseCtx.Ctx, params, stream)
- latencyMs := time.Since(callStart).Milliseconds()
- // Always close stream after Call returns so the goroutine can finish,
- // even if the adapter did not close it. No-op if adapter already closed it.
- closeStream()
- // Wait for the streaming goroutine to drain all remaining chunks.
- if streamDone != nil {
- <-streamDone
- }
- if err != nil {
- if e.isV310OrLater() {
- // v3.10: Set partial _meta with whatever info we have
- ctx.SetLocalVar("_meta", buildMetaFromError(err, latencyMs))
- }
- return fmt.Errorf("LLM call failed: %w", err)
- }
- // Emit llm_done RunEvent (always on success, spec 3.12 §13.4)
- // Order: step_start → llm_token(×N, stream only) → llm_done → step_done
- {
- stepID := step.ID
- llmDonePayload := map[string]interface{}{
- "latency_ms": latencyMs,
- }
- if fr, ok := result["finish_reason"].(string); ok {
- llmDonePayload["finish_reason"] = fr
- }
- if m, ok := result["model"].(string); ok {
- llmDonePayload["model"] = m
- }
- if usage, ok := result["usage"].(map[string]interface{}); ok {
- normalized := map[string]interface{}{}
- if v, ok := usage["prompt_tokens"]; ok {
- normalized["input_tokens"] = v
- }
- if v, ok := usage["completion_tokens"]; ok {
- normalized["output_tokens"] = v
- }
- if v, ok := usage["total_tokens"]; ok {
- normalized["total_tokens"] = v
- }
- llmDonePayload["usage"] = normalized
- }
- e.emitRunEvent(baseCtx, RunEventLLMDone, &stepID, llmDonePayload)
- }
- // Version-dependent result handling
- if e.isV310OrLater() {
- // === v3.10 semantics: _result = content only, _meta = metadata ===
- e.applyV310LLMResult(ctx, result, params, latencyMs)
- } else {
- // === v3.6-v3.9 semantics (unchanged) ===
- if isStructuredOutput(params) {
- // For structured output, _result is the parsed content directly
- if content, ok := result["content"]; ok {
- ctx.SetLocalVar("_result", content)
- } else {
- ctx.SetLocalVar("_result", result)
- }
- } else {
- // For non-structured output, _result is the full result map (v3.6 behavior)
- ctx.SetLocalVar("_result", result)
- }
- }
- // Apply output mapping
- if len(step.Out) > 0 {
- if err := e.applyOutputMapping(ctx, step.Out); err != nil {
- return fmt.Errorf("failed to apply output mapping: %w", err)
- }
- }
- // Clear ephemeral local variables
- ctx.DeleteLocalVar("_result")
- if e.isV310OrLater() {
- ctx.DeleteLocalVar("_meta")
- }
- return nil
- }
- // applyV310LLMResult sets _result and _meta for v3.10 semantics
- func (e *Engine) applyV310LLMResult(ctx ContextAccessor, result map[string]interface{}, params map[string]interface{}, latencyMs int64) {
- // _result = content body only (string for text, parsed object for json_schema/json_object)
- if content, ok := result["content"]; ok {
- ctx.SetLocalVar("_result", unwrapJSONObject(content, params))
- } else {
- ctx.SetLocalVar("_result", nil)
- }
- // _meta = call metadata
- meta := map[string]interface{}{
- "latency_ms": latencyMs,
- }
- if model, ok := result["model"].(string); ok {
- meta["model"] = model
- meta["model_resolved"] = model
- meta["provider"] = inferProvider(model)
- }
- if finishReason, ok := result["finish_reason"].(string); ok {
- meta["finish_reason"] = finishReason
- }
- if responseID, ok := result["response_id"].(string); ok && responseID != "" {
- meta["response_id"] = responseID
- }
- // Build normalized usage
- if usage, ok := result["usage"].(map[string]interface{}); ok {
- normalizedUsage := map[string]interface{}{
- "raw": usage,
- }
- if pt, ok := usage["prompt_tokens"]; ok {
- normalizedUsage["input_tokens"] = pt
- }
- if ct, ok := usage["completion_tokens"]; ok {
- normalizedUsage["output_tokens"] = ct
- }
- if tt, ok := usage["total_tokens"]; ok {
- normalizedUsage["total_tokens"] = tt
- }
- meta["usage"] = normalizedUsage
- }
- ctx.SetLocalVar("_meta", meta)
- }
- // inferProvider infers the LLM provider from the model name
- func inferProvider(model string) string {
- m := strings.ToLower(model)
- switch {
- case strings.HasPrefix(m, "claude") || strings.Contains(m, "anthropic"):
- return "anthropic"
- case strings.HasPrefix(m, "gpt") || strings.HasPrefix(m, "o1") || strings.HasPrefix(m, "o3"):
- return "openai"
- case strings.Contains(m, "gemini"):
- return "google"
- case strings.Contains(m, "mistral"):
- return "mistral"
- default:
- return "unknown"
- }
- }
- // buildMetaFromError extracts partial metadata from an LLM error
- func buildMetaFromError(err error, latencyMs int64) map[string]interface{} {
- meta := map[string]interface{}{
- "latency_ms": latencyMs,
- }
- var llmErr *LLMError
- if errors.As(err, &llmErr) {
- if llmErr.Model != "" {
- meta["model"] = llmErr.Model
- meta["provider"] = inferProvider(llmErr.Model)
- }
- if llmErr.Provider != "" {
- meta["provider"] = llmErr.Provider
- }
- if llmErr.RequestID != "" {
- meta["request_id"] = llmErr.RequestID
- }
- }
- return meta
- }
- // injectDocs resolves doc IDs from the "docs" param and appends their content
- // to the last system message. If no system message exists, one is created.
- func (e *Engine) injectDocs(baseCtx *ExecutionContext, params map[string]interface{}) error {
- docIDs, ok := params["docs"].([]interface{})
- if !ok || len(docIDs) == 0 {
- return nil
- }
- if baseCtx.DocAdapter == nil {
- return fmt.Errorf("docs referenced but no DocAdapter configured")
- }
- // Resolve each doc ID
- var docContents []string
- for _, raw := range docIDs {
- docID := fmt.Sprintf("%v", raw)
- // Validate doc is declared in registry
- if !e.workflow.Registry.HasDoc(docID) {
- return fmt.Errorf("doc %q not declared in registry.docs", docID)
- }
- content, err := baseCtx.DocAdapter.Get(baseCtx.Ctx, docID)
- if err != nil {
- return fmt.Errorf("failed to resolve doc %s: %w", docID, err)
- }
- desc := e.workflow.Registry.Docs[docID]
- docContents = append(docContents, fmt.Sprintf("[Doc %s: %s]\n%s", docID, desc, content))
- }
- if len(docContents) == 0 {
- return nil
- }
- docBlock := strings.Join(docContents, "\n\n")
- // Find the last system message and append doc content
- messages, ok := params["messages"].([]interface{})
- if !ok {
- return nil
- }
- lastSystemIdx := -1
- for i, m := range messages {
- if msg, ok := m.(map[string]interface{}); ok {
- if role, _ := msg["role"].(string); role == "system" {
- lastSystemIdx = i
- }
- }
- }
- if lastSystemIdx >= 0 {
- msg := messages[lastSystemIdx].(map[string]interface{})
- existing, _ := msg["content"].(string)
- msg["content"] = existing + "\n\n" + docBlock
- } else {
- // No system message — prepend one
- sysMsg := map[string]interface{}{
- "role": "system",
- "content": docBlock,
- }
- params["messages"] = append([]interface{}{sysMsg}, messages...)
- }
- // Remove docs from params so it's not passed to the LLM adapter
- delete(params, "docs")
- return nil
- }
- // executeAPIStep executes an API_* step
- func (e *Engine) executeAPIStep(ctx ContextAccessor, step *Step) error {
- // Extract API ID from step ID (API_xxx)
- apiID := step.ID[4:] // Remove "API_" prefix
- // Get base context for accessing fields
- baseCtx := ctx.GetBaseContext()
- // Get API definition from registry
- apiDef, err := e.workflow.Registry.GetAPIDefinition(apiID)
- if err != nil {
- return fmt.Errorf("API definition not found: %w", err)
- }
- // Evaluate input parameters (with deep evaluation for nested structures)
- evaluator := NewExpressionEvaluator(ctx)
- params := make(map[string]interface{})
- for key, valueExpr := range step.In {
- val, err := evaluator.EvaluateDeep(valueExpr)
- if err != nil {
- return fmt.Errorf("failed to evaluate input parameter %s: %w", key, err)
- }
- params[key] = val
- }
- // Resolve auth token if specified
- if apiDef.Auth != "" {
- var authToken interface{}
- var err error
- if strings.HasPrefix(apiDef.Auth, "=") {
- authToken, err = evaluator.EvaluateValue(apiDef.Auth)
- } else {
- authToken, err = evaluator.Evaluate(apiDef.Auth)
- }
- if err != nil {
- return fmt.Errorf("failed to resolve auth token: %w", err)
- }
- if authToken != nil {
- if tokenStr, ok := authToken.(string); ok {
- params["authToken"] = tokenStr
- }
- }
- }
- if baseCtx.APIAdapter == nil {
- return fmt.Errorf("API step %s: no APIAdapter configured", step.ID)
- }
- // Call API
- result, err := baseCtx.APIAdapter.Call(baseCtx.Ctx, apiDef, params)
- if err != nil {
- return fmt.Errorf("API call failed: %w", err)
- }
- // Store result in local variable (use ctx accessor for proper scoping in parallel mode)
- ctx.SetLocalVar("_result", result)
- // Apply output mapping
- if len(step.Out) > 0 {
- if err := e.applyOutputMapping(ctx, step.Out); err != nil {
- return fmt.Errorf("failed to apply output mapping: %w", err)
- }
- }
- // Clear _result
- ctx.DeleteLocalVar("_result")
- return nil
- }
- // executeSetStep executes a Set_* step
- func (e *Engine) executeSetStep(ctx ContextAccessor, step *Step) error {
- evaluator := NewExpressionEvaluator(ctx)
- // Evaluate value expression
- value, err := evaluator.EvaluateValue(step.Value)
- if err != nil {
- return fmt.Errorf("failed to evaluate value: %w", err)
- }
- // Set variable
- if err := evaluator.SetVariable(step.Target, value); err != nil {
- return fmt.Errorf("failed to set variable: %w", err)
- }
- return nil
- }
- // executeWriteStep executes a Write_* step
- func (e *Engine) executeWriteStep(ctx ContextAccessor, step *Step) error {
- evaluator := NewExpressionEvaluator(ctx)
- // Get base context for accessing fields
- baseCtx := ctx.GetBaseContext()
- // Evaluate target path
- targetPath, err := evaluator.EvaluateValue(step.Target)
- if err != nil {
- return fmt.Errorf("failed to evaluate target path: %w", err)
- }
- targetPathStr, ok := targetPath.(string)
- if !ok {
- return fmt.Errorf("target path must be a string, got: %T", targetPath)
- }
- // Resolve {var} interpolations (e.g. ".tmp/{_iterDir}/item.txt")
- targetPathStr, err = e.interpolateFilePath(ctx, targetPathStr)
- if err != nil {
- return fmt.Errorf("failed to interpolate target path: %w", err)
- }
- // Resolve .tmp/ prefix to run-isolated path before artifact check
- targetPathStr = e.resolveTmpPath(ctx, targetPathStr)
- // Check if path is allowed (check against logical/resolved path)
- if !e.workflow.Registry.IsArtifactPathAllowed(targetPathStr) {
- return fmt.Errorf("write path not allowed: %s", targetPathStr)
- }
- // Evaluate value
- value, err := evaluator.EvaluateValue(step.Value)
- if err != nil {
- return fmt.Errorf("failed to evaluate value: %w", err)
- }
- // Convert value to bytes
- var content []byte
- switch v := value.(type) {
- case string:
- content = []byte(v)
- case []byte:
- content = v
- default:
- // Serialize non-string values as JSON
- jsonBytes, err := json.Marshal(v)
- if err != nil {
- return fmt.Errorf("failed to marshal value to JSON: %w", err)
- }
- content = jsonBytes
- }
- // Determine write mode
- mode := WriteModeOverwrite
- if step.Mode != "" {
- mode = WriteMode(step.Mode)
- }
- if baseCtx.FileAdapter == nil {
- return fmt.Errorf("write step %s: no FileAdapter configured", step.ID)
- }
- // Write file
- if err := baseCtx.FileAdapter.Write(baseCtx.Ctx, targetPathStr, content, mode); err != nil {
- return fmt.Errorf("failed to write file: %w", err)
- }
- // Store artifact reference
- baseCtx.Artifacts[targetPathStr] = targetPathStr
- // Emit file_done RunEvent (spec 3.13 §13.3)
- {
- stepID := step.ID
- e.emitRunEvent(baseCtx, RunEventFileDone, &stepID, map[string]interface{}{
- "path": strings.TrimPrefix(targetPathStr, "/"),
- "size_bytes": len(content),
- })
- }
- return nil
- }
- // executeBranchStep executes a Branch_* step
- func (e *Engine) executeBranchStep(ctx ContextAccessor, step *Step) error {
- evaluator := NewExpressionEvaluator(ctx)
- // Evaluate cases to find the first matching one
- var selectedStepID string
- for _, c := range step.Cases {
- if len(c) != 2 {
- return fmt.Errorf("invalid case format: expected [expression, stepId], got %d elements", len(c))
- }
- expression := c[0]
- stepID := c[1]
- if expression == "ELSE" {
- selectedStepID = stepID
- break
- }
- var result interface{}
- var err error
- if strings.HasPrefix(expression, "=") {
- result, err = evaluator.EvaluateValue(expression)
- } else {
- result, err = evaluator.Evaluate(expression)
- }
- if err != nil {
- return fmt.Errorf("failed to evaluate branch condition: %w", err)
- }
- if toBool(result) {
- selectedStepID = stepID
- break
- }
- }
- if selectedStepID == "" {
- // No matching case and no ELSE: skip all branch bodies, proceed to Branch_*.next.
- // Spec §10.10: "若无命中且无 ELSE,跳过所有分支子链,继续 Branch_*.next"
- return nil
- }
- // Execute the selected branch
- branchStep := e.findStepByID(selectedStepID)
- if branchStep == nil {
- return fmt.Errorf("branch step not found: %s", selectedStepID)
- }
- return e.executeStep(ctx, branchStep)
- }
- // executeLoopStep executes a Loop_* step, dispatching to while or source mode.
- func (e *Engine) executeLoopStep(ctx ContextAccessor, step *Step) error {
- // v3.16+: while mode (condition-based loop)
- if step.While != "" {
- return e.executeWhileLoop(ctx, step)
- }
- return e.executeSourceLoop(ctx, step)
- }
- // executeSourceLoop executes a Loop_* step with a source array.
- func (e *Engine) executeSourceLoop(ctx ContextAccessor, step *Step) error {
- evaluator := NewExpressionEvaluator(ctx)
- // Get loop source from step.Source property.
- sourceStr, ok := step.Source.(string)
- if !ok || sourceStr == "" {
- return fmt.Errorf("loop source not specified or not a string")
- }
- // Evaluate source using new expression convention (supports = prefix)
- // For backward compatibility: if no = prefix, treat as direct variable reference
- sourceExpr := sourceStr
- if !strings.HasPrefix(sourceExpr, "=") {
- // Add = prefix for backward compatibility with old format ($items)
- sourceExpr = "=" + sourceExpr
- }
- source, err := evaluator.EvaluateValue(sourceExpr)
- if err != nil {
- return fmt.Errorf("failed to evaluate loop source: %w", err)
- }
- // Convert source to slice
- sourceSlice := reflect.ValueOf(source)
- if sourceSlice.Kind() != reflect.Slice && sourceSlice.Kind() != reflect.Array {
- return fmt.Errorf("loop source must be an array, got: %T", source)
- }
- // Get loop mode
- mode := LoopMode(step.Mode)
- if mode != LoopModeParallel && mode != LoopModeSerial {
- return fmt.Errorf("invalid loop mode: %s", step.Mode)
- }
- // Execute loop iterations
- length := sourceSlice.Len()
- // v3.16+: cap at min(len(source), maxIterations) if maxIterations is set
- if step.MaxIterations != nil && *step.MaxIterations < length {
- length = *step.MaxIterations
- }
- if mode == LoopModeSerial {
- // Serial execution
- for i := 0; i < length; i++ {
- item := sourceSlice.Index(i).Interface()
- if err := e.executeLoopIteration(ctx, step, item, i); err != nil {
- // v3.16+: BREAK exits the loop cleanly (not an error)
- if IsBreakError(err) {
- return nil
- }
- return fmt.Errorf("loop iteration %d failed: %w", i, err)
- }
- }
- } else {
- // Parallel execution with proper synchronization
- if err := e.executeLoopParallel(ctx, step, sourceSlice, length); err != nil {
- return err
- }
- }
- return nil
- }
- // executeWhileLoop executes a Loop_* step with a while condition (v3.16+).
- // While mode is always serial. _item is not available; _index and _iterDir are.
- func (e *Engine) executeWhileLoop(ctx ContextAccessor, step *Step) error {
- maxIter := *step.MaxIterations // guaranteed non-nil by validation
- for i := 0; i < maxIter; i++ {
- // Evaluate while condition BEFORE each iteration (including iteration 0)
- evaluator := NewExpressionEvaluator(ctx)
- whileExpr := step.While
- var condResult interface{}
- var err error
- if strings.HasPrefix(whileExpr, "=") {
- condResult, err = evaluator.EvaluateValue(whileExpr)
- } else {
- condResult, err = evaluator.Evaluate(whileExpr)
- }
- if err != nil {
- return fmt.Errorf("failed to evaluate while condition: %w", err)
- }
- if !toBool(condResult) {
- break // Condition false, exit loop
- }
- // Set loop local variables: _index and _iterDir (no _item for while loops)
- ctx.SetLocalVar("_index", i)
- ctx.SetLocalVar("_iterDir", fmt.Sprintf("%s_%d", step.ID, i))
- // Execute children
- var childErr error
- for _, childID := range step.Children {
- child := e.findStepByID(childID)
- if child == nil {
- childErr = fmt.Errorf("loop child step not found: %s", childID)
- break
- }
- if err := e.executeStep(ctx, child); err != nil {
- childErr = err
- break
- }
- }
- // Cleanup locals
- ctx.DeleteLocalVar("_index")
- ctx.DeleteLocalVar("_iterDir")
- if childErr != nil {
- // v3.16+: BREAK exits the loop cleanly
- if IsBreakError(childErr) {
- return nil
- }
- return fmt.Errorf("while loop iteration %d failed: %w", i, childErr)
- }
- }
- return nil
- }
- // executeLoopParallel executes loop iterations in parallel
- func (e *Engine) executeLoopParallel(ctx ContextAccessor, loopStep *Step, sourceSlice reflect.Value, length int) error {
- // Get base context for accessing fields
- baseCtx := ctx.GetBaseContext()
- // Wrap execution context with thread-safe accessor
- safeCtx := NewSafeExecutionContext(baseCtx)
- // Get parallel executor
- executor := e.getParallelExecutor()
- // v3.16+: For BREAK support in parallel, use a shared atomic flag.
- // When any iteration triggers BREAK, the flag is set. Subsequent iterations
- // check the flag before starting and skip. Already-running iterations complete normally.
- var breakRequested int32 // atomic: 0=no, 1=yes
- // Create branches for each iteration
- branches := make([]ParallelBranch, length)
- for i := 0; i < length; i++ {
- item := sourceSlice.Index(i).Interface()
- index := i
- branches[i] = ParallelBranch{
- ID: fmt.Sprintf("%s[%d]", loopStep.ID, index),
- Fn: func(branchCtx context.Context) error {
- // v3.16+: Check if BREAK was already requested by another iteration
- if atomic.LoadInt32(&breakRequested) != 0 {
- return nil // Skip this iteration
- }
- // Create child context with isolated local vars
- childCtx := NewChildExecutionContext(safeCtx)
- childCtx.SetLocalVar("_item", item)
- childCtx.SetLocalVar("_index", index)
- // v3.14+: inject _iterDir for per-iteration .tmp/ isolation
- childCtx.SetLocalVar("_iterDir", fmt.Sprintf("%s_%d", loopStep.ID, index))
- // Execute children in child context
- for _, childID := range loopStep.Children {
- // Check cancellation before each child
- select {
- case <-branchCtx.Done():
- return branchCtx.Err()
- default:
- }
- child := e.findStepByID(childID)
- if child == nil {
- return fmt.Errorf("loop child step not found: %s", childID)
- }
- // Execute with child context - works because executeStep accepts ContextAccessor
- if err := e.executeStep(childCtx, child); err != nil {
- // v3.16+: BREAK signals loop exit — set flag and return cleanly
- if IsBreakError(err) {
- atomic.StoreInt32(&breakRequested, 1)
- return nil // This iteration ends, others complete naturally
- }
- return fmt.Errorf("iteration %d: %w", index, err)
- }
- }
- return nil
- },
- }
- }
- // Execute all branches in parallel
- if err := executor.Execute(baseCtx.Ctx, branches, e.getErrorStrategy()); err != nil {
- return fmt.Errorf("parallel loop execution failed: %w", err)
- }
- return nil
- }
- // executeLoopIteration executes a single loop iteration
- func (e *Engine) executeLoopIteration(ctx ContextAccessor, loopStep *Step, item interface{}, index int) error {
- // Set loop local variables (use ctx accessor for proper scoping)
- ctx.SetLocalVar("_item", item)
- ctx.SetLocalVar("_index", index)
- // v3.14+: inject _iterDir for per-iteration .tmp/ isolation
- ctx.SetLocalVar("_iterDir", fmt.Sprintf("%s_%d", loopStep.ID, index))
- defer ctx.DeleteLocalVar("_item")
- defer ctx.DeleteLocalVar("_index")
- defer ctx.DeleteLocalVar("_iterDir")
- // Execute children
- for _, childID := range loopStep.Children {
- child := e.findStepByID(childID)
- if child == nil {
- return fmt.Errorf("loop child step not found: %s", childID)
- }
- if err := e.executeStep(ctx, child); err != nil {
- return err
- }
- }
- return nil
- }
- // applyOutputMapping applies the output mapping from _result to global variables and files
- // Keys starting with $ write to variables, keys starting with / write to files.
- func (e *Engine) applyOutputMapping(ctx ContextAccessor, outMapping StepOutput) error {
- evaluator := NewExpressionEvaluator(ctx)
- // Get base context for file adapter and RunEvent emission
- baseCtx := ctx.GetBaseContext()
- for target, valueExpr := range outMapping {
- // Evaluate the value expression
- value, err := evaluator.EvaluateValue(valueExpr)
- if err != nil {
- return fmt.Errorf("failed to evaluate output expression for %s: %w", target, err)
- }
- // Determine if this is a file write or variable write
- if strings.HasPrefix(target, "/") {
- // File write - interpolate path variables
- filePath, err := e.interpolateFilePath(ctx, target)
- if err != nil {
- return fmt.Errorf("failed to interpolate file path %s: %w", target, err)
- }
- // Convert value to bytes
- var content []byte
- switch v := value.(type) {
- case string:
- content = []byte(v)
- case []byte:
- content = v
- default:
- // Serialize non-string values as JSON
- jsonBytes, err := json.Marshal(v)
- if err != nil {
- return fmt.Errorf("failed to marshal value to JSON: %w", err)
- }
- content = jsonBytes
- }
- if baseCtx.FileAdapter == nil {
- return fmt.Errorf("output mapping: no FileAdapter configured for file target %s", target)
- }
- // Write file
- if err := baseCtx.FileAdapter.Write(baseCtx.Ctx, filePath, content, WriteModeOverwrite); err != nil {
- return fmt.Errorf("failed to write output file %s: %w", filePath, err)
- }
- // Store artifact reference
- baseCtx.Artifacts[filePath] = filePath
- // Emit file_done RunEvent (spec 3.13 §13.3)
- // Order: llm_done → file_done(×N) → step_done
- {
- stepID := baseCtx.CurrentStepID
- e.emitRunEvent(baseCtx, RunEventFileDone, &stepID, map[string]interface{}{
- "path": strings.TrimPrefix(filePath, "/"),
- "size_bytes": len(content),
- })
- }
- } else if strings.HasPrefix(target, "$") {
- // Variable write - may include path with _index interpolation
- resolvedTarget, err := e.interpolateVarPath(ctx, target)
- if err != nil {
- return fmt.Errorf("failed to interpolate var path %s: %w", target, err)
- }
- // Set the variable
- if err := evaluator.SetVariable(resolvedTarget, value); err != nil {
- return fmt.Errorf("failed to set output variable %s: %w", resolvedTarget, err)
- }
- } else {
- return fmt.Errorf("invalid output target %s: must start with $ (variable) or / (file)", target)
- }
- }
- return nil
- }
- // interpolateFilePath interpolates variables in file paths using {var} syntax
- // Example: "/vsc/{_item.path}" with _item.path = "files/fileA.ts" -> "/vsc/files/fileA.ts"
- // Each {expr} is evaluated exactly once; substituted values are not re-scanned.
- func (e *Engine) interpolateFilePath(ctx ContextAccessor, path string) (string, error) {
- evaluator := NewExpressionEvaluator(ctx)
- var sb strings.Builder
- remaining := path
- for {
- start := strings.Index(remaining, "{")
- if start == -1 {
- sb.WriteString(remaining)
- break
- }
- end := strings.Index(remaining[start:], "}")
- if end == -1 {
- sb.WriteString(remaining)
- break
- }
- end += start
- // Write the literal prefix before the expression
- sb.WriteString(remaining[:start])
- // Evaluate the expression inside { }
- expr := remaining[start+1 : end]
- val, err := evaluator.Evaluate(expr)
- if err != nil {
- return "", fmt.Errorf("failed to evaluate path expression %s: %w", expr, err)
- }
- sb.WriteString(fmt.Sprintf("%v", val))
- // Advance past the closing brace; substituted value is not re-scanned
- remaining = remaining[end+1:]
- }
- return sb.String(), nil
- }
- // interpolateVarPath interpolates _index in variable paths
- // Example: "$generated[_index].name" with _index = 2 -> "$generated[2].name"
- func (e *Engine) interpolateVarPath(ctx ContextAccessor, path string) (string, error) {
- // Check if path contains _index
- if !strings.Contains(path, "_index") {
- return path, nil
- }
- // Get _index value
- indexVal, ok := ctx.GetLocalVar("_index")
- if !ok {
- return path, nil // No _index, return as-is
- }
- // Replace _index with actual value
- return strings.ReplaceAll(path, "_index", fmt.Sprintf("%v", indexVal)), nil
- }
- // resolveTmpPath rewrites .tmp/xxx paths to .tmp/{runID}/xxx for run-level isolation (v3.14+).
- // Paths that do not start with ".tmp/" are returned unchanged.
- func (e *Engine) resolveTmpPath(ctx ContextAccessor, path string) string {
- if !strings.HasPrefix(path, ".tmp/") {
- return path
- }
- baseCtx := ctx.GetBaseContext()
- runID := baseCtx.WorkflowID
- rest := strings.TrimPrefix(path, ".tmp/")
- return ".tmp/" + runID + "/" + rest
- }
- // resolveStepFilePath evaluates a file path expression and applies {var} interpolation
- // and .tmp/ run-isolation in one step. Used by Download_* and Unzip_*.
- func (e *Engine) resolveStepFilePath(ctx ContextAccessor, raw string) (string, error) {
- evaluator := NewExpressionEvaluator(ctx)
- // Step 1: handle = expressions ("=$var", "=\"literal\"", etc.)
- val, err := evaluator.EvaluateValue(raw)
- if err != nil {
- return "", fmt.Errorf("failed to evaluate path expression: %w", err)
- }
- pathStr, ok := val.(string)
- if !ok {
- return "", fmt.Errorf("path expression must yield a string, got %T", val)
- }
- // Step 2: resolve {var} interpolations (e.g. ".tmp/{_iterDir}/bundle.zip")
- pathStr, err = e.interpolateFilePath(ctx, pathStr)
- if err != nil {
- return "", fmt.Errorf("failed to interpolate path: %w", err)
- }
- // Step 3: rewrite .tmp/ prefix for run isolation
- return e.resolveTmpPath(ctx, pathStr), nil
- }
- // routePathByExt maps a filename to a target directory using the routeByExt table.
- // It returns defaultDir when no extension matches. An empty defaultDir is allowed
- // (caller should treat it as "discard / skip" if needed).
- func routePathByExt(name string, routeByExt map[string]string, defaultDir string) string {
- ext := strings.ToLower(filepath.Ext(name))
- if dir, ok := routeByExt[ext]; ok {
- return dir
- }
- return defaultDir
- }
- // zipSlipSafe returns true if the entry path is safe (no path traversal).
- func zipSlipSafe(entryName string) bool {
- if strings.HasPrefix(entryName, "/") {
- return false
- }
- // filepath.Clean normalises ".." components; if the result escapes the root it's unsafe
- cleaned := filepath.Clean(entryName)
- if strings.HasPrefix(cleaned, "..") {
- return false
- }
- // Reject absolute paths on any OS (Windows drive letters: "C:\")
- if filepath.IsAbs(cleaned) {
- return false
- }
- return true
- }
- // executeDownloadStep executes a Download_* step (v3.14+).
- // It downloads a single file from an external URL and writes it to the artifact space,
- // either to an explicit target path or to a directory chosen by routeByExt.
- func (e *Engine) executeDownloadStep(ctx ContextAccessor, step *Step) error {
- baseCtx := ctx.GetBaseContext()
- evaluator := NewExpressionEvaluator(ctx)
- if baseCtx.FileAdapter == nil {
- return fmt.Errorf("download step %s: no FileAdapter configured", step.ID)
- }
- // --- Resolve source ---
- var downloadURL string
- var extraHeaders map[string]string
- sourceVal, err := evaluator.EvaluateValue(step.Source)
- if err != nil {
- return fmt.Errorf("download step %s: failed to evaluate source: %w", step.ID, err)
- }
- switch sv := sourceVal.(type) {
- case string:
- downloadURL = sv
- case map[string]interface{}:
- // Object form: {url, headers, auth, timeout, checksum}
- urlVal, ok := sv["url"]
- if !ok {
- return fmt.Errorf("download step %s: source object missing 'url' field", step.ID)
- }
- downloadURL, ok = urlVal.(string)
- if !ok {
- return fmt.Errorf("download step %s: source.url must be a string", step.ID)
- }
- if hdrs, ok := sv["headers"]; ok {
- if hdrMap, ok := hdrs.(map[string]interface{}); ok {
- extraHeaders = make(map[string]string, len(hdrMap))
- for k, v := range hdrMap {
- extraHeaders[k] = fmt.Sprintf("%v", v)
- }
- }
- }
- default:
- return fmt.Errorf("download step %s: source must be a URL string or object, got %T", step.ID, sourceVal)
- }
- if downloadURL == "" {
- return fmt.Errorf("download step %s: download URL is empty", step.ID)
- }
- // --- HTTP download (streaming into memory buffer) ---
- httpCtx, cancel := context.WithTimeout(baseCtx.Ctx, 5*time.Minute)
- defer cancel()
- req, err := http.NewRequestWithContext(httpCtx, http.MethodGet, downloadURL, nil)
- if err != nil {
- return fmt.Errorf("download step %s: failed to build HTTP request: %w", step.ID, err)
- }
- for k, v := range extraHeaders {
- req.Header.Set(k, v)
- }
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- return fmt.Errorf("download step %s: HTTP request failed: %w", step.ID, err)
- }
- defer resp.Body.Close()
- if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- return fmt.Errorf("download step %s: server returned HTTP %d", step.ID, resp.StatusCode)
- }
- content, err := io.ReadAll(resp.Body)
- if err != nil {
- return fmt.Errorf("download step %s: failed to read response body: %w", step.ID, err)
- }
- // --- Determine target path(s) ---
- // writtenPath captures the resolved artifact path for _result output.
- var writtenPath string
- writeFile := func(targetPath string, data []byte) error {
- // Resolve {var} interpolation and .tmp/ isolation
- resolved, err := e.resolveStepFilePath(ctx, targetPath)
- if err != nil {
- return err
- }
- if !e.workflow.Registry.IsArtifactPathAllowed(resolved) {
- return fmt.Errorf("download target path not allowed: %s", resolved)
- }
- if err := baseCtx.FileAdapter.Write(baseCtx.Ctx, resolved, data, WriteModeOverwrite); err != nil {
- return fmt.Errorf("failed to write download to %s: %w", resolved, err)
- }
- writtenPath = resolved
- baseCtx.Artifacts[resolved] = resolved
- stepID := step.ID
- e.emitRunEvent(baseCtx, RunEventFileDone, &stepID, map[string]interface{}{
- "path": resolved,
- "size_bytes": len(data),
- })
- return nil
- }
- var writeErr error
- if step.Target != "" {
- // Single explicit target
- writeErr = writeFile(step.Target, content)
- } else if len(step.RouteByExt) > 0 || step.DefaultDir != "" {
- // Route by extension: derive filename from URL
- urlPath := downloadURL
- if idx := strings.Index(urlPath, "?"); idx >= 0 {
- urlPath = urlPath[:idx]
- }
- filename := filepath.Base(urlPath)
- if filename == "" || filename == "." {
- filename = "download"
- }
- dir := routePathByExt(filename, step.RouteByExt, step.DefaultDir)
- if dir != "" {
- targetPath := strings.TrimSuffix(dir, "/") + "/" + filename
- writeErr = writeFile(targetPath, content)
- }
- // dir == "": no matching extension and no defaultDir → skip (not an error)
- } else {
- return fmt.Errorf("download step %s: must specify either 'target' or 'routeByExt'", step.ID)
- }
- if writeErr != nil {
- return writeErr
- }
- // Set _result and apply out mapping (same pattern as Service_*/Component_* steps)
- ctx.SetLocalVar("_result", map[string]interface{}{
- "path": writtenPath,
- })
- if len(step.Out) > 0 {
- if err := e.applyOutputMapping(ctx, step.Out); err != nil {
- ctx.DeleteLocalVar("_result")
- return fmt.Errorf("download step %s: failed to apply output mapping: %w", step.ID, err)
- }
- }
- ctx.DeleteLocalVar("_result")
- return nil
- }
- // executeUnzipStep executes an Unzip_* step (v3.14+).
- // It reads a zip archive from the artifact space, extracts each entry, and writes
- // the contents to directories determined by routeByExt (mandatory) / defaultDir (optional).
- // zip-slip entries (path traversal) are rejected.
- func (e *Engine) executeUnzipStep(ctx ContextAccessor, step *Step) error {
- baseCtx := ctx.GetBaseContext()
- evaluator := NewExpressionEvaluator(ctx)
- if baseCtx.FileAdapter == nil {
- return fmt.Errorf("unzip step %s: no FileAdapter configured", step.ID)
- }
- // --- Resolve source path ---
- sourceVal, err := evaluator.EvaluateValue(step.Source)
- if err != nil {
- return fmt.Errorf("unzip step %s: failed to evaluate source: %w", step.ID, err)
- }
- sourceStr, ok := sourceVal.(string)
- if !ok {
- return fmt.Errorf("unzip step %s: source must be a string path expression, got %T", step.ID, sourceVal)
- }
- sourcePath, err := e.resolveStepFilePath(ctx, sourceStr)
- if err != nil {
- return fmt.Errorf("unzip step %s: failed to resolve source path: %w", step.ID, err)
- }
- // --- Read the zip file ---
- zipBytes, err := baseCtx.FileAdapter.Read(baseCtx.Ctx, sourcePath)
- if err != nil {
- return fmt.Errorf("unzip step %s: failed to read zip file %s: %w", step.ID, sourcePath, err)
- }
- // --- Open zip reader ---
- zr, err := zip.NewReader(bytes.NewReader(zipBytes), int64(len(zipBytes)))
- if err != nil {
- return fmt.Errorf("unzip step %s: failed to open zip archive: %w", step.ID, err)
- }
- // --- Determine overwrite mode ---
- overwrite := true
- if step.Overwrite != nil {
- overwrite = *step.Overwrite
- }
- writeMode := WriteModeOverwrite
- if !overwrite {
- writeMode = WriteModeFailIfExists
- }
- // --- Extract entries ---
- var extractedPaths []string
- for _, entry := range zr.File {
- if entry.FileInfo().IsDir() {
- continue // directories are created implicitly
- }
- entryName := entry.Name
- // zip-slip protection
- if !zipSlipSafe(entryName) {
- return fmt.Errorf("unzip step %s: unsafe entry path rejected: %s", step.ID, entryName)
- }
- // Route by extension
- dir := routePathByExt(entryName, step.RouteByExt, step.DefaultDir)
- if dir == "" {
- continue // no matching rule and no defaultDir: skip
- }
- baseName := filepath.Base(entryName)
- targetPath := strings.TrimSuffix(dir, "/") + "/" + baseName
- // Resolve .tmp/ isolation
- resolvedPath := e.resolveTmpPath(ctx, targetPath)
- if !e.workflow.Registry.IsArtifactPathAllowed(resolvedPath) {
- return fmt.Errorf("unzip step %s: target path not allowed: %s", step.ID, resolvedPath)
- }
- // Read entry content
- rc, err := entry.Open()
- if err != nil {
- return fmt.Errorf("unzip step %s: failed to open entry %s: %w", step.ID, entryName, err)
- }
- entryBytes, err := io.ReadAll(rc)
- rc.Close()
- if err != nil {
- return fmt.Errorf("unzip step %s: failed to read entry %s: %w", step.ID, entryName, err)
- }
- // Write to FileAdapter
- if err := baseCtx.FileAdapter.Write(baseCtx.Ctx, resolvedPath, entryBytes, writeMode); err != nil {
- return fmt.Errorf("unzip step %s: failed to write %s: %w", step.ID, resolvedPath, err)
- }
- baseCtx.Artifacts[resolvedPath] = resolvedPath
- stepID := step.ID
- e.emitRunEvent(baseCtx, RunEventFileDone, &stepID, map[string]interface{}{
- "path": resolvedPath,
- "size_bytes": len(entryBytes),
- })
- extractedPaths = append(extractedPaths, resolvedPath)
- }
- // Set _result to extraction summary, then apply out mapping (same pattern as other steps)
- ctx.SetLocalVar("_result", map[string]interface{}{
- "count": len(extractedPaths),
- "files": extractedPaths,
- })
- if len(step.Out) > 0 {
- if err := e.applyOutputMapping(ctx, step.Out); err != nil {
- ctx.DeleteLocalVar("_result")
- return fmt.Errorf("unzip step %s: failed to apply output mapping: %w", step.ID, err)
- }
- }
- ctx.DeleteLocalVar("_result")
- return nil
- }
- // executePauseStep implements the Pause_* node (v3.15+).
- // It blocks the workflow goroutine until the workflow is resumed via Engine.Resume,
- // times out (if timeout is configured), or the context is cancelled.
- //
- // Unlike other step executors, executePauseStep is responsible for emitting step_done
- // and routing to step.Next after a successful resume (similar to how Stop_* handles its
- // own terminal routing). The caller (executeStep) returns early on nil to skip the
- // standard post-switch step_done / next logic.
- func (e *Engine) executePauseStep(ctx ContextAccessor, step *Step, stepStartTime time.Time, stepTypePattern string) error {
- baseCtx := ctx.GetBaseContext()
- stepID := step.ID
- // Generate a unique wait token for this pause instance.
- // The token is stored and published as a SHA-256 hex digest so that the
- // persisted "waitToken (hash)" matches the spec §11.3.1 requirement.
- // Callers obtain the token from the pause_start RunEvent and must supply
- // the same value in ResumeRequest.Token.
- rawToken := fmt.Sprintf("wt_%s_%s_%d", baseCtx.WorkflowID, step.ID, time.Now().UnixNano())
- h := sha256.Sum256([]byte(rawToken))
- token := hex.EncodeToString(h[:])
- // Initialise PauseState and publish it so Resume() can find it.
- state := &PauseState{
- ch: make(chan resumeSignal, 1),
- token: token,
- nodeID: step.ID,
- seenRequestIDs: make(map[string]bool),
- }
- baseCtx.pauseMu.Lock()
- baseCtx.PauseState = state
- baseCtx.pauseMu.Unlock()
- // Calculate optional expiry timestamp for the pause_start payload.
- var expireAtStr string
- if step.Timeout != nil {
- expireAt := time.Now().Add(time.Duration(step.Timeout.Sec) * time.Second)
- expireAtStr = expireAt.UTC().Format("2006-01-02T15:04:05.000Z07:00")
- }
- // Set workflow status to paused.
- ctx.SetStatus(StatusPaused)
- // Emit pause_start.
- pausePayload := map[string]interface{}{
- "nodeId": stepID,
- "waitToken": token,
- "resumeResultTarget": step.ResumeResultTarget,
- }
- if step.Reason != "" {
- pausePayload["reason"] = step.Reason
- }
- if expireAtStr != "" {
- pausePayload["expireAt"] = expireAtStr
- }
- e.emitRunEvent(baseCtx, RunEventPauseStart, &stepID, pausePayload)
- // Block waiting for resume signal, timeout, or context cancellation.
- var sig resumeSignal
- var timedOut bool
- if step.Timeout != nil {
- timer := time.NewTimer(time.Duration(step.Timeout.Sec) * time.Second)
- defer timer.Stop()
- select {
- case sig = <-state.ch:
- // Resume signal received.
- case <-timer.C:
- timedOut = true
- case <-baseCtx.Ctx.Done():
- ctx.SetStatus(StatusFailed)
- return baseCtx.Ctx.Err()
- }
- } else {
- select {
- case sig = <-state.ch:
- // Resume signal received.
- case <-baseCtx.Ctx.Done():
- ctx.SetStatus(StatusFailed)
- return baseCtx.Ctx.Err()
- }
- }
- if timedOut {
- // Emit pause_timeout.
- e.emitRunEvent(baseCtx, RunEventPauseTimeout, &stepID, map[string]interface{}{
- "nodeId": stepID,
- "expiredAt": time.Now().UTC().Format("2006-01-02T15:04:05.000Z07:00"),
- "timeoutAction": step.Timeout.On,
- })
- // Resume execution at the timeout handler.
- // Status is set back to running; the timeout step (typically Stop_* or an
- // error-handler chain) is responsible for further status transitions.
- ctx.SetStatus(StatusRunning)
- timeoutStep := e.findStepByID(step.Timeout.On)
- if timeoutStep == nil {
- return fmt.Errorf("Pause_* step %s: timeout.on step %q not found", step.ID, step.Timeout.On)
- }
- return e.executeStep(ctx, timeoutStep)
- }
- // --- Resume path ---
- // Write the resume payload to resumeResultTarget in $vars.
- evaluator := NewExpressionEvaluator(ctx)
- if err := evaluator.SetVariable(step.ResumeResultTarget, sig.Payload); err != nil {
- return fmt.Errorf("Pause_* step %s: failed to write resume payload to %q: %w", step.ID, step.ResumeResultTarget, err)
- }
- // Emit pause_resumed.
- e.emitRunEvent(baseCtx, RunEventPauseResumed, &stepID, map[string]interface{}{
- "nodeId": stepID,
- "requestId": sig.RequestID,
- "resumedAt": time.Now().UTC().Format("2006-01-02T15:04:05.000Z07:00"),
- })
- // Restore running status.
- ctx.SetStatus(StatusRunning)
- // Note: Pause_* does NOT support the `print` field (spec §5.1: 不适用 for Pause_*).
- // step_print is intentionally omitted here.
- // Emit step_done for the Pause_* node (successful resume path only).
- e.emitRunEvent(baseCtx, RunEventStepDone, &stepID, map[string]interface{}{
- "step_type": stepTypePattern,
- "duration_ms": time.Since(stepStartTime).Milliseconds(),
- })
- // Continue to the next step.
- if step.Next == "" {
- return fmt.Errorf("Pause_* step %s is missing required 'next' field", step.ID)
- }
- if step.Next == "RETURN" {
- return nil
- }
- nextStep := e.findStepByID(step.Next)
- if nextStep == nil {
- return fmt.Errorf("next step not found: %s (referenced by Pause_* step %s)", step.Next, step.ID)
- }
- return e.executeStep(ctx, nextStep)
- }
|