package workflow import ( "context" "encoding/json" "errors" "fmt" "strings" "sync" "sync/atomic" "time" ) // Engine is the main workflow execution engine type Engine struct { workflow *Workflow parallelExecutor *ParallelExecutor errorStrategy ParallelErrorStrategy eventSequence uint64 // Atomic counter for event sequence numbers stopOnce sync.Once // Captures the first Stop_* node ID stopNodeID string // ID of the first Stop_* node that triggered stop } // NewEngine creates a new workflow engine with default options func NewEngine(workflow *Workflow) (*Engine, error) { return NewEngineWithOptions(workflow, DefaultEngineOptions) } // NewEngineWithOptions creates a new workflow engine with custom options func NewEngineWithOptions(workflow *Workflow, opts EngineOptions) (*Engine, error) { // Validate workflow is not nil if workflow == nil { return nil, fmt.Errorf("workflow cannot be nil") } // Validate workflow structure if err := workflow.Validate(); err != nil { return nil, fmt.Errorf("invalid workflow: %w", err) } return &Engine{ workflow: workflow, parallelExecutor: NewParallelExecutor(opts.MaxConcurrency), errorStrategy: opts.ParallelErrorStrategy, eventSequence: 0, }, nil } // Execute starts workflow execution with the given context and initial variables func (e *Engine) Execute(ctx context.Context, initialVars map[string]interface{}, adapters *Adapters) (*ExecutionResult, error) { // Validate required parameters if ctx == nil { return nil, fmt.Errorf("context cannot be nil") } if adapters == nil { return nil, fmt.Errorf("adapters cannot be nil") } // Create execution context execCtx := &ExecutionContext{ Ctx: ctx, WorkflowID: generateWorkflowID(), Params: make(map[string]interface{}), Variables: make(map[string]interface{}), SystemVars: make(map[string]interface{}), LocalVars: make(map[string]interface{}), Artifacts: make(map[string]string), Status: StatusRunning, StartTime: time.Now(), RunEventStream: make(chan RunEvent, 500), ServiceAdapter: adapters.Service, APIAdapter: adapters.API, ComponentAdapter: adapters.Component, LLMAdapter: adapters.LLM, LLMAdapterRegistry: adapters.LLMAdapterRegistry, FileAdapter: adapters.File, DocAdapter: adapters.Doc, } // Initialize parameter types from registry execCtx.ParamTypes = make(map[string]string) if paramDecls, err := e.workflow.Registry.GetParamDeclarations(); err == nil { for name, decl := range paramDecls { execCtx.ParamTypes[name] = decl.Type } } // Initialize variable types from registry execCtx.VarTypes = make(map[string]string) if varDecls, err := e.workflow.Registry.GetVariableDeclarations(); err == nil { for name, decl := range varDecls { execCtx.VarTypes[name] = decl.Type } } // Initialize parameters and variables from initialVars // Parameters are those defined in registry.Params (no $ prefix) // Variables are those with $ prefix for k, v := range initialVars { if strings.HasPrefix(k, "$") { execCtx.Variables[k] = v } else { // Check if this is a declared parameter if paramType, ok := execCtx.ParamTypes[k]; ok { // Apply type conversion for OBJECT params convertedValue := v if paramType == "OBJECT" { if str, isString := v.(string); isString { var obj map[string]interface{} if err := json.Unmarshal([]byte(str), &obj); err != nil { return nil, fmt.Errorf("failed to convert parameter %s to OBJECT: %w", k, err) } convertedValue = obj } } execCtx.Params[k] = convertedValue } else { // Not a declared parameter, treat as variable with $ prefix execCtx.Variables["$"+k] = v } } } // Apply registry param defaults for parameters not supplied by the caller (spec §3.2). // This runs after initialVars processing so explicit caller values always take precedence. if paramDecls, err := e.workflow.Registry.GetParamDeclarations(); err == nil { for name, decl := range paramDecls { if _, alreadySet := execCtx.Params[name]; !alreadySet && decl.Default != nil { if defaultVal, coerceErr := CoerceParamDefault(decl); coerceErr == nil && defaultVal != nil { execCtx.Params[name] = defaultVal } } } } // Execute workflow go e.executeWorkflow(execCtx) // Return result return &ExecutionResult{ Context: execCtx, RunEventStream: execCtx.RunEventStream, }, nil } // ExecuteWithRunParams starts workflow execution with run-level control parameters (v3.15+, spec §2.2). // RunParams allow the caller to specify a workspace scope, a selective node list, an // execution mode, and business-level input params without polluting the workflow's own registry. // // RunParams.Params (spec §1.7 runParams.params) are merged into the effective initial variables. // They take precedence over initialVars for the same key, mirroring how the online platform // passes declared registry params to the engine separately from internal engine state. func (e *Engine) ExecuteWithRunParams(ctx context.Context, initialVars map[string]interface{}, adapters *Adapters, runParams RunParams) (*ExecutionResult, error) { // Merge RunParams.Params + initialVars → RunParams.Params wins (they are the caller-declared params). // RunParams.Params keys are bare names (e.g. "prompt") per the spec §1.7. // They are passed through unchanged so Execute()'s param-routing logic places them in // execCtx.Params[name], accessible via =name expressions (no $ prefix). merged := make(map[string]interface{}, len(initialVars)+len(runParams.Params)) for k, v := range initialVars { merged[k] = v } for k, v := range runParams.Params { merged[k] = v } result, err := e.Execute(ctx, merged, adapters) if err != nil { return nil, err } // Attach RunParams to the execution context so downstream steps / adapters can inspect them. result.Context.RunParams = &runParams return result, nil } // executeWorkflow executes the workflow from the beginning func (e *Engine) executeWorkflow(execCtx *ExecutionContext) { defer close(execCtx.RunEventStream) // Emit workflow_start RunEvent e.emitRunEvent(execCtx, RunEventWorkflowStart, nil, map[string]interface{}{ "params": execCtx.Params, }) // Find true entry nodes (spec §1.4C: unreferenced nodes, NOT always steps[0]). entryIDs := findEntryNodeIDs(e.workflow.Steps) if len(entryIDs) == 0 { e.fail(execCtx, fmt.Errorf("no entry nodes found in workflow")) return } var execErr error if len(entryIDs) == 1 { // Single entry: execute serially (common case) entry := e.findStepByID(entryIDs[0]) execErr = e.executeStep(execCtx, entry) } else { // Multiple entry nodes: execute in parallel (spec §1.4C) execErr = e.executeChildren(execCtx, entryIDs) } if execErr != nil { e.fail(execCtx, execErr) return } // Terminal event: emit workflow_done AFTER all branches have finished. switch execCtx.Status { case StatusRunning: e.complete(execCtx) case StatusStopped: // Deferred from e.stop() — emit workflow_done now that every parallel // branch has returned, so all step events precede the terminal event. e.emitRunEvent(execCtx, RunEventWorkflowDone, nil, map[string]interface{}{ "stop_id": e.stopNodeID, "duration_ms": time.Since(execCtx.StartTime).Milliseconds(), }) } } // executeStep executes a single step func (e *Engine) executeStep(ctx ContextAccessor, step *Step) error { // Get base context baseCtx := ctx.GetBaseContext() // Short-circuit if this branch has been stopped or the workflow is paused. // For ChildExecutionContext (parallel branches), IsStopped() checks the // branch-local flag so sibling branches are NOT short-circuited. // For non-parallel paths, IsStopped() checks the global Status. // Pause is always a global check — all branches should pause. if ctx.IsStopped() || baseCtx.Status == StatusPaused { return nil } // Set current step ID using thread-safe method ctx.SetCurrentStepID(step.ID) // Collect file write targets for the step.start event and file_start RunEvents. // Expand {expr} templates in out-key paths (e.g. "/{_item.filePath}") so that // file_start carries the actual resolved path, not the raw template string. var fileTargets []string for target := range step.Out { if strings.HasPrefix(target, "/") { if expanded, err := e.interpolateFilePath(ctx, target); err == nil { fileTargets = append(fileTargets, expanded) } else { fileTargets = append(fileTargets, target) } } } if strings.HasPrefix(step.ID, "Write_") && step.Target != "" { // Try to evaluate the target path expression; fall back to the raw string evaluator := NewExpressionEvaluator(ctx) if targetVal, err := evaluator.EvaluateValue(step.Target); err == nil { if targetStr, ok := targetVal.(string); ok { fileTargets = append(fileTargets, targetStr) } else { fileTargets = append(fileTargets, step.Target) } } else { fileTargets = append(fileTargets, step.Target) } } // Check if condition stepType := e.getStepType(step.ID) stepTypePattern := string(stepType) + "_*" if step.If != "" { evaluator := NewExpressionEvaluator(ctx) var result interface{} var err error if strings.HasPrefix(step.If, "=") { result, err = evaluator.EvaluateValue(step.If) } else { result, err = evaluator.Evaluate(step.If) } if err != nil { return fmt.Errorf("failed to evaluate if condition: %w", err) } if !toBool(result) { // Emit step_skipped RunEvent (spec 3.12 §13.3) stepID := step.ID e.emitRunEvent(baseCtx, RunEventStepSkipped, &stepID, map[string]interface{}{ "step_type": stepTypePattern, "reason": "if_false", }) // Skip this step and its children, move to next if step.Next != "" { // Handle special next values if step.Next == "RETURN" { return nil } if step.Next == "BREAK" { return ErrBreak } nextStep := e.findStepByID(step.Next) if nextStep != nil { return e.executeStep(ctx, nextStep) } } // No next - this branch is complete return nil } } // Emit step_start RunEvent (condition passed, about to execute) stepID := step.ID e.emitRunEvent(baseCtx, RunEventStepStart, &stepID, map[string]interface{}{ "step_type": stepTypePattern, }) // Emit file_start RunEvents for all file targets (spec 3.13 §13.3) // Batch-emitted immediately after step_start, before any llm_token for _, target := range fileTargets { path := strings.TrimPrefix(target, "/") e.emitRunEvent(baseCtx, RunEventFileStart, &stepID, map[string]interface{}{ "path": path, }) } stepStartTime := time.Now() var err error switch stepType { case StepTypeService: err = e.executeServiceStep(ctx, step) case StepTypeAPI: err = e.executeAPIStep(ctx, step) case StepTypeComponent: err = e.executeComponentStep(ctx, step) case StepTypeLLM: err = e.executeLLMStep(ctx, step) case StepTypeSet: err = e.executeSetStep(ctx, step) case StepTypeWrite: err = e.executeWriteStep(ctx, step) case StepTypeDownload: err = e.executeDownloadStep(ctx, step) case StepTypeUnzip: err = e.executeUnzipStep(ctx, step) case StepTypePause: // Pause_* has special routing semantics: executePauseStep blocks until // the workflow is resumed or times out, then handles step_done and next // routing internally (similar to how Stop_* handles workflow termination). pauseErr := e.executePauseStep(ctx, step, stepStartTime, stepTypePattern) if pauseErr == nil { return nil // executePauseStep handled step_done and next routing } // Non-nil error (e.g. context cancellation): emit step_error and handle onError e.emitRunEvent(baseCtx, RunEventStepError, &stepID, map[string]interface{}{ "step_type": stepTypePattern, "error": buildErrorMap(pauseErr), "duration_ms": time.Since(stepStartTime).Milliseconds(), }) if step.OnError != "" && e.isV310OrLater() { ctx.SetLocalVar("_error", buildErrorMap(pauseErr)) errorStep := e.findStepByID(step.OnError) if errorStep == nil { ctx.DeleteLocalVar("_error") ctx.DeleteLocalVar("_meta") return fmt.Errorf("onError step not found: %s (referenced by %s)", step.OnError, step.ID) } onErrResult := e.executeStep(ctx, errorStep) ctx.DeleteLocalVar("_error") ctx.DeleteLocalVar("_meta") return onErrResult } return pauseErr case StepTypeBranch: err = e.executeBranchStep(ctx, step) case StepTypeLoop: err = e.executeLoopStep(ctx, step) case StepTypeStop: e.stop(ctx) return nil case StepTypeNoop: // Noop step does nothing, just executes children err = nil default: return fmt.Errorf("unknown step type: %s", step.ID) } if err != nil { // v3.16+: BREAK is a control flow signal, not an error — propagate without step_error if IsBreakError(err) { return err } // Emit step_error RunEvent (spec 3.12 §13.4) e.emitRunEvent(baseCtx, RunEventStepError, &stepID, map[string]interface{}{ "step_type": stepTypePattern, "error": buildErrorMap(err), "duration_ms": time.Since(stepStartTime).Milliseconds(), }) // v3.10+: If step has onError handler, jump there instead of failing if step.OnError != "" && e.isV310OrLater() { // _error is built from the error (structured if LLMError, generic otherwise) ctx.SetLocalVar("_error", buildErrorMap(err)) // Execute the onError handler step errorStep := e.findStepByID(step.OnError) if errorStep == nil { ctx.DeleteLocalVar("_error") ctx.DeleteLocalVar("_meta") return fmt.Errorf("onError step not found: %s (referenced by %s)", step.OnError, step.ID) } onErrResult := e.executeStep(ctx, errorStep) // Clean up after onError handler completes ctx.DeleteLocalVar("_error") ctx.DeleteLocalVar("_meta") return onErrResult } return err } // Emit step_print RunEvent (spec 3.13 §5.2.12) // Order: file_done(×N) → step_print(0..1) → step_done // Only on successful execution; Stop_* nodes cannot have print if step.Print != "" && stepType != StepTypeStop { evaluator := NewExpressionEvaluator(ctx) printVal, err := evaluator.EvaluateValue(step.Print) if err == nil { var message string switch v := printVal.(type) { case string: message = v default: if b, jerr := json.Marshal(v); jerr == nil { message = string(b) } else { message = fmt.Sprintf("%v", v) } } e.emitRunEvent(baseCtx, RunEventStepPrint, &stepID, map[string]interface{}{ "message": message, }) } } // Emit step_done RunEvent (spec 3.12 §13.4) e.emitRunEvent(baseCtx, RunEventStepDone, &stepID, map[string]interface{}{ "step_type": stepTypePattern, "duration_ms": time.Since(stepStartTime).Milliseconds(), }) // Execute children (parallel branches) // Skip for Loop_* and Branch_* which handle their own children/branches internally if len(step.Children) > 0 && stepType != StepTypeLoop && stepType != StepTypeBranch { if err := e.executeChildren(ctx, step.Children); err != nil { return err } } // Move to next step if step.Next != "" { // Handle special next values if step.Next == "RETURN" { // Return from current execution path (used in children/loops) return nil } if step.Next == "BREAK" { // v3.16+: Exit the entire enclosing loop return ErrBreak } nextStep := e.findStepByID(step.Next) if nextStep != nil { return e.executeStep(ctx, nextStep) } return fmt.Errorf("next step not found: %s", step.Next) } // No next step - this branch is complete // Children workflows ending here are considered complete // Main workflow completion is handled by executeWorkflow return nil } // getStepType extracts the step type from the ID prefix func (e *Engine) getStepType(stepID string) StepType { if strings.HasPrefix(stepID, "Service_") { return StepTypeService } else if strings.HasPrefix(stepID, "API_") { return StepTypeAPI } else if strings.HasPrefix(stepID, "Component_") { return StepTypeComponent } else if strings.HasPrefix(stepID, "LLM_") { return StepTypeLLM } else if strings.HasPrefix(stepID, "Set_") { return StepTypeSet } else if strings.HasPrefix(stepID, "Write_") { return StepTypeWrite } else if strings.HasPrefix(stepID, "Download_") { return StepTypeDownload } else if strings.HasPrefix(stepID, "Unzip_") { return StepTypeUnzip } else if strings.HasPrefix(stepID, "Pause_") { return StepTypePause } else if strings.HasPrefix(stepID, "Branch_") { return StepTypeBranch } else if strings.HasPrefix(stepID, "Loop_") { return StepTypeLoop } else if strings.HasPrefix(stepID, "Stop_") { return StepTypeStop } else if strings.HasPrefix(stepID, "Noop_") { return StepTypeNoop } return "" } // findStepByID finds a step by its ID func (e *Engine) findStepByID(stepID string) *Step { for i := range e.workflow.Steps { if e.workflow.Steps[i].ID == stepID { return &e.workflow.Steps[i] } } return nil } // stop stops the workflow (called when a Stop_* node executes). // Multiple parallel branches may each reach their own Stop_* node; // stopOnce ensures workflow_done is emitted exactly once. func (e *Engine) stop(ctx ContextAccessor) { // Use thread-safe method to set status ctx.SetStatus(StatusStopped) baseCtx := ctx.GetBaseContext() // Capture the first Stop_* node ID. The actual workflow_done event is emitted // by executeWorkflow AFTER all parallel branches have finished, so the SSE // reader sees every step event before the terminal event. e.stopOnce.Do(func() { e.stopNodeID = baseCtx.CurrentStepID }) } // complete marks the workflow as completed (reached end naturally, no Stop_* node) func (e *Engine) complete(ctx ContextAccessor) { ctx.SetStatus(StatusCompleted) baseCtx := ctx.GetBaseContext() // Emit workflow_done so clients receive the same signal whether the workflow // terminated via Stop_* or ran off the end of the last step. e.emitRunEvent(baseCtx, RunEventWorkflowDone, nil, map[string]interface{}{ "duration_ms": time.Since(baseCtx.StartTime).Milliseconds(), }) } // fail fails the workflow func (e *Engine) fail(ctx ContextAccessor, err error) { // Use thread-safe method to set status ctx.SetStatus(StatusFailed) baseCtx := ctx.GetBaseContext() // Emit workflow_failed RunEvent (spec 3.12 §13.4) e.emitRunEvent(baseCtx, RunEventWorkflowFailed, nil, map[string]interface{}{ "failed_step_id": baseCtx.CurrentStepID, "error": buildErrorMap(err), "duration_ms": time.Since(baseCtx.StartTime).Milliseconds(), }) } // Resume sends a resume signal to a workflow that is suspended at a Pause_* node (v3.15+). // It is idempotent: a duplicate call with the same RequestID is silently ignored. // Returns an error if the workflow is not currently paused, the token is invalid, // or the resume signal has already been delivered. func (e *Engine) Resume(execCtx *ExecutionContext, req ResumeRequest) error { execCtx.pauseMu.Lock() state := execCtx.PauseState execCtx.pauseMu.Unlock() if state == nil { // Emit pause_rejected for state_error (spec §11.5.4: non-paused state → rejected) e.emitRunEvent(execCtx, RunEventPauseRejected, nil, map[string]interface{}{ "requestId": req.RequestID, "reasonCode": "state_error", }) return fmt.Errorf("workflow is not in paused state") } // Validate RunID if provided (spec §11.4.1: runId must identify the correct run) if req.RunID != "" && req.RunID != execCtx.WorkflowID { return fmt.Errorf("resume runId %q does not match workflow run %q", req.RunID, execCtx.WorkflowID) } state.mu.Lock() defer state.mu.Unlock() // Validate token before any idempotency check if req.Token != state.token { nodeID := state.nodeID e.emitRunEvent(execCtx, RunEventPauseRejected, &nodeID, map[string]interface{}{ "nodeId": state.nodeID, "requestId": req.RequestID, "reasonCode": "invalid_token", }) return fmt.Errorf("invalid resume token") } // Idempotency: duplicate RequestID → silent no-op if req.RequestID != "" && state.seenRequestIDs[req.RequestID] { return nil } // Reject if already resumed by a different request if state.resumed { nodeID := state.nodeID e.emitRunEvent(execCtx, RunEventPauseRejected, &nodeID, map[string]interface{}{ "nodeId": state.nodeID, "requestId": req.RequestID, "reasonCode": "already_resumed", }) return fmt.Errorf("workflow has already been resumed") } // Record request ID for idempotency before sending if req.RequestID != "" { state.seenRequestIDs[req.RequestID] = true } state.resumed = true // Send signal (buffered channel size 1, non-blocking) state.ch <- resumeSignal{Payload: req.Payload, RequestID: req.RequestID} return nil } // snapshotLocalVars copies known local variables from a ContextAccessor // so they can be propagated to child contexts (e.g., _item/_index from loops) func snapshotLocalVars(ctx ContextAccessor) map[string]interface{} { snapshot := make(map[string]interface{}) for _, key := range []string{"_item", "_index", "_result", "_meta", "_error"} { if val, ok := ctx.GetLocalVar(key); ok { snapshot[key] = val } } return snapshot } // executeChildren executes child steps in parallel func (e *Engine) executeChildren(parentCtx ContextAccessor, childIDs []string) error { if len(childIDs) == 0 { return nil } baseCtx := parentCtx.GetBaseContext() // Optimization: single child doesn't need parallelism if len(childIDs) == 1 { child := e.findStepByID(childIDs[0]) if child == nil { return fmt.Errorf("child step not found: %s", childIDs[0]) } return e.executeStep(parentCtx, child) } // Parallel execution for multiple children safeCtx := NewSafeExecutionContext(baseCtx) executor := e.getParallelExecutor() // Snapshot parent's local vars so child branches inherit them (e.g., _item/_index from loops) parentLocalVars := snapshotLocalVars(parentCtx) // Create branches for each child branches := make([]ParallelBranch, len(childIDs)) for i, childID := range childIDs { id := childID branches[i] = ParallelBranch{ ID: id, Fn: func(ctx context.Context) error { // Create child context with isolated local vars childCtx := NewChildExecutionContext(safeCtx) // Propagate parent's local vars (e.g., _item, _index from enclosing loop) for k, v := range parentLocalVars { childCtx.LocalVars[k] = v } // Check cancellation select { case <-ctx.Done(): return ctx.Err() default: } child := e.findStepByID(id) if child == nil { return fmt.Errorf("child step not found: %s", id) } // Execute with child context if err := e.executeStep(childCtx, child); err != nil { return fmt.Errorf("child %s: %w", id, err) } return nil }, } } // Execute all branches in parallel if err := executor.Execute(baseCtx.Ctx, branches, e.getErrorStrategy()); err != nil { return fmt.Errorf("parallel children execution failed: %w", err) } return nil } // getParallelExecutor returns the parallel executor func (e *Engine) getParallelExecutor() *ParallelExecutor { return e.parallelExecutor } // getErrorStrategy returns the configured error strategy func (e *Engine) getErrorStrategy() ParallelErrorStrategy { return e.errorStrategy } // isV310OrLater returns true if the workflow version is 3.10 or later func (e *Engine) isV310OrLater() bool { v := e.workflow.Version return v == "3.10" || v == "3.12" || v == "3.13" || v == "3.14" || v == "3.15" || v == "3.16" } // buildErrorMap converts an error into a map suitable for _error local variable func buildErrorMap(err error) map[string]interface{} { var llmErr *LLMError if errors.As(err, &llmErr) { return llmErr.ToMap() } // Generic error fallback return map[string]interface{}{ "type": "internal_error", "code": "INTERNAL_ERROR", "message": err.Error(), } } // nextEventSequence returns the next event sequence number func (e *Engine) nextEventSequence() uint64 { return atomic.AddUint64(&e.eventSequence, 1) } // emitRunEvent emits a structured RunEvent to the run_events stream (spec 3.12 Chapter 13). // Uses a non-blocking send (fire-and-forget): events are dropped if the channel is full or closed. // The send is wrapped in a recover to guard against "send on closed channel" panics that can // occur when Resume emits a pause_rejected event after the workflow goroutine has already closed // the stream (e.g. the workflow completed between the paused check and the emit call). func (e *Engine) emitRunEvent(ctx *ExecutionContext, eventType RunEventType, stepID *string, payload map[string]interface{}) { if ctx.RunEventStream == nil { return } event := RunEvent{ RunID: ctx.WorkflowID, Seq: e.nextEventSequence(), Ts: time.Now().UTC().Format("2006-01-02T15:04:05.000Z07:00"), Type: eventType, StepID: stepID, Payload: payload, } // Recover from "send on closed channel" panic (can happen when Resume is called // after the workflow goroutine has closed RunEventStream). defer func() { recover() }() //nolint:errcheck select { case ctx.RunEventStream <- event: default: // fire-and-forget: drop silently if channel is full } } // Adapters holds all the adapters needed for workflow execution type Adapters struct { Service ServiceAdapter API APIAdapter Component ComponentAdapter LLM LLMAdapter LLMAdapterRegistry *LLMAdapterRegistry // v3.16+: multi-provider registry (nil for pre-3.16) File FileAdapter Doc DocAdapter } // ExecutionResult holds the result of workflow execution type ExecutionResult struct { Context *ExecutionContext RunEventStream <-chan RunEvent // v3.12+: structured run_events stream (spec Chapter 13) } // Validate validates the workflow structure func (w *Workflow) Validate() error { if w.Version != "3.6" && w.Version != "3.7" && w.Version != "3.8" && w.Version != "3.9" && w.Version != "3.10" && w.Version != "3.12" && w.Version != "3.13" && w.Version != "3.14" && w.Version != "3.15" && w.Version != "3.16" { return fmt.Errorf("unsupported workflow version: %s (expected 3.6, 3.7, 3.8, 3.9, 3.10, 3.12, 3.13, 3.14, 3.15, or 3.16)", w.Version) } if w.Name == "" { return fmt.Errorf("workflow name is required") } if len(w.Steps) == 0 { return fmt.Errorf("workflow must have at least one step") } // Validate registry if err := w.Registry.ValidateRegistry(); err != nil { return err } // IDE scenario validation (v3.9+): forbid Service_* nodes if w.WorkflowType == WorkflowTypeIDE { // Check that no Service_* nodes exist in steps for _, step := range w.Steps { if strings.HasPrefix(step.ID, "Service_") { return fmt.Errorf("IDE workflow (WorkflowType: IDE) cannot contain Service_* nodes: %s", step.ID) } } // Check that registry.services is empty if len(w.Registry.Services) > 0 { return fmt.Errorf("IDE workflow (WorkflowType: IDE) must have empty registry.services") } } // ── Phase 1: Build stepID set, check uniqueness and basic field constraints ────────── stepIDs := make(map[string]bool) for _, step := range w.Steps { if stepIDs[step.ID] { return fmt.Errorf("duplicate step ID: %s", step.ID) } // v3.16+: BREAK and RETURN are reserved keywords, cannot be used as stepId if step.ID == "BREAK" { return fmt.Errorf("'BREAK' is a reserved keyword and cannot be used as a step ID") } if step.ID == "RETURN" { return fmt.Errorf("'RETURN' is a reserved keyword and cannot be used as a step ID") } stepIDs[step.ID] = true // Stop_* must NOT have next or children (spec §2.3.4) if strings.HasPrefix(step.ID, "Stop_") { if step.Next != "" { return fmt.Errorf("step %s: Stop_* steps cannot have a 'next' field (spec §2.3.4)", step.ID) } if len(step.Children) > 0 { return fmt.Errorf("step %s: Stop_* steps cannot have 'children' (spec §2.3.4)", step.ID) } } else { // All other steps must declare a 'next' field if step.Next == "" { return fmt.Errorf("step %s must have a 'next' field (use 'RETURN' for child/loop steps)", step.ID) } } } // ── Phase 2: Reference integrity (spec §2.3.3) ──────────────────────────────────── // Every next / children / cases / timeout.on / onError value must resolve to an // existing step ID (or the special sentinels "RETURN" / "BREAK"). for _, step := range w.Steps { check := func(fieldName, target string) error { if target == "" || target == "RETURN" || target == "BREAK" { return nil } if !stepIDs[target] { return fmt.Errorf("step %s: %s references non-existent step %q", step.ID, fieldName, target) } return nil } if err := check("next", step.Next); err != nil { return err } for i, childID := range step.Children { if err := check(fmt.Sprintf("children[%d]", i), childID); err != nil { return err } } for i, c := range step.Cases { if len(c) == 2 { if err := check(fmt.Sprintf("cases[%d]", i), c[1]); err != nil { return err } } } if step.Timeout != nil { if err := check("timeout.on", step.Timeout.On); err != nil { return err } } if err := check("onError", step.OnError); err != nil { return err } } // ── Phase 2b: BREAK scope validation (v3.16+) ───────────────────────────────────── // BREAK is only valid inside a Loop_* children subtree. // Build the set of stepIDs that are transitively reachable from any Loop_* children entry. loopChildren := make(map[string]bool) for _, step := range w.Steps { if !strings.HasPrefix(step.ID, "Loop_") || len(step.Children) == 0 { continue } // BFS from this Loop_*'s children entries visited := make(map[string]bool) bfsQueue := make([]string, len(step.Children)) copy(bfsQueue, step.Children) for _, id := range step.Children { visited[id] = true } for len(bfsQueue) > 0 { cur := bfsQueue[0] bfsQueue = bfsQueue[1:] loopChildren[cur] = true // Find the step and add its successors (skip RETURN/BREAK which terminate) for _, s := range w.Steps { if s.ID != cur { continue } succs := []string{s.Next, s.OnError} succs = append(succs, s.Children...) for _, c := range s.Cases { if len(c) == 2 { succs = append(succs, c[1]) } } if s.Timeout != nil { succs = append(succs, s.Timeout.On) } for _, sid := range succs { if sid == "" || sid == "RETURN" || sid == "BREAK" || visited[sid] { continue } visited[sid] = true bfsQueue = append(bfsQueue, sid) } break } } } // Verify every step with next:"BREAK" is inside a Loop_* children subtree for _, step := range w.Steps { if step.Next == "BREAK" && !loopChildren[step.ID] { return fmt.Errorf("step %s: 'BREAK' is only valid inside a Loop_* children subtree", step.ID) } // Also check Branch cases that target BREAK-reachable steps for _, c := range step.Cases { if len(c) == 2 && c[1] == "BREAK" && !loopChildren[step.ID] { return fmt.Errorf("step %s: BREAK target in cases is only valid inside a Loop_* children subtree", step.ID) } } } // ── Phase 3: Entry node validation (spec §2.3.1) ────────────────────────────────── entryNodeIDs := findEntryNodeIDs(w.Steps) if len(entryNodeIDs) == 0 { return fmt.Errorf("workflow graph has no entry node: every step is referenced by another step, creating a cycle with no starting point") } // ── Phase 4: Reachability (spec §2.3.2) ─────────────────────────────────────────── // All steps must be reachable from at least one entry node via BFS. reachable := make(map[string]bool) queue := make([]string, len(entryNodeIDs)) copy(queue, entryNodeIDs) for _, id := range entryNodeIDs { reachable[id] = true } for len(queue) > 0 { curr := queue[0] queue = queue[1:] // Find step (linear scan is fine at validation time) for _, step := range w.Steps { if step.ID != curr { continue } successors := []string{step.Next, step.OnError} successors = append(successors, step.Children...) for _, c := range step.Cases { if len(c) == 2 { successors = append(successors, c[1]) } } if step.Timeout != nil { successors = append(successors, step.Timeout.On) } for _, s := range successors { if s != "" && s != "RETURN" && s != "BREAK" && !reachable[s] { reachable[s] = true queue = append(queue, s) } } break } } for _, step := range w.Steps { if !reachable[step.ID] { return fmt.Errorf("step %s is unreachable from any entry node (spec §2.3.2)", step.ID) } } // Validate Download_* and Unzip_* field constraints (v3.14+) for _, step := range w.Steps { if strings.HasPrefix(step.ID, "Download_") { // source is required if step.Source == nil { return fmt.Errorf("step %s: 'source' is required for Download_* steps", step.ID) } // target and routeByExt/defaultDir are mutually exclusive; at least one is required hasTarget := step.Target != "" hasRoute := len(step.RouteByExt) > 0 || step.DefaultDir != "" if hasTarget && hasRoute { return fmt.Errorf("step %s: 'target' and 'routeByExt'/'defaultDir' are mutually exclusive for Download_* steps", step.ID) } if !hasTarget && !hasRoute { return fmt.Errorf("step %s: Download_* steps must specify either 'target' or 'routeByExt'/'defaultDir'", step.ID) } } if strings.HasPrefix(step.ID, "Unzip_") { // source is required if step.Source == nil { return fmt.Errorf("step %s: 'source' is required for Unzip_* steps", step.ID) } // routeByExt is required (may be empty map {} combined with defaultDir, but the field must be declared) if step.RouteByExt == nil { return fmt.Errorf("step %s: 'routeByExt' is required for Unzip_* steps (use {} with 'defaultDir' to route all files to one directory)", step.ID) } } if strings.HasPrefix(step.ID, "Pause_") { // resumeResultTarget is required (spec §5.1, §11.2.1) if step.ResumeResultTarget == "" { return fmt.Errorf("step %s: 'resumeResultTarget' is required for Pause_* steps", step.ID) } // children is not applicable for Pause_* (spec §5.1: 不适用) if len(step.Children) > 0 { return fmt.Errorf("step %s: 'children' is not supported on Pause_* steps (spec §5.1)", step.ID) } // If timeout is provided, sec must be > 0 and on must be non-empty (spec §11.2.2) if step.Timeout != nil { if step.Timeout.Sec <= 0 { return fmt.Errorf("step %s: timeout.sec must be > 0 for Pause_* steps", step.ID) } if step.Timeout.On == "" { return fmt.Errorf("step %s: timeout.on is required when timeout is configured for Pause_* steps", step.ID) } } } // v3.16+: Loop_* field constraints if strings.HasPrefix(step.ID, "Loop_") { hasWhile := step.While != "" hasSource := step.Source != nil // source is interface{} — also check string form if sourceStr, ok := step.Source.(string); ok && sourceStr == "" { hasSource = false } // while and source both present → ERROR if hasWhile && hasSource { return fmt.Errorf("step %s: 'while' and 'source' are mutually exclusive on Loop_* steps", step.ID) } // while without maxIterations → ERROR if hasWhile && step.MaxIterations == nil { return fmt.Errorf("step %s: 'maxIterations' is required when 'while' is used on Loop_* steps", step.ID) } // maxIterations < 1 → ERROR if step.MaxIterations != nil && *step.MaxIterations < 1 { return fmt.Errorf("step %s: 'maxIterations' must be >= 1, got %d", step.ID, *step.MaxIterations) } // while + mode:"parallel" → ERROR if hasWhile && step.Mode == "parallel" { return fmt.Errorf("step %s: 'while' mode requires mode 'serial', got 'parallel'", step.ID) } } // v3.16+: LLM_* model field validation if strings.HasPrefix(step.ID, "LLM_") && step.Model != "" { // model contains "/" but provider or modelId part is empty → ERROR if strings.Contains(step.Model, "/") { parts := strings.SplitN(step.Model, "/", 2) if parts[0] == "" || parts[1] == "" { return fmt.Errorf("step %s: 'model' with '/' must have non-empty provider and modelId (got %q)", step.ID, step.Model) } } } } return nil } // generateWorkflowID generates a unique workflow execution ID func generateWorkflowID() string { return fmt.Sprintf("wf_%d", time.Now().UnixNano()) } // findEntryNodeIDs returns the IDs of all "entry" nodes in a step list. // An entry node is one whose ID does not appear in any other step's // next / children / cases / timeout.on / onError fields. // Spec §1.4C: the engine auto-identifies entry nodes; it MUST NOT always use steps[0]. func findEntryNodeIDs(steps []Step) []string { referenced := make(map[string]bool) for _, step := range steps { if step.Next != "" && step.Next != "RETURN" && step.Next != "BREAK" { referenced[step.Next] = true } for _, childID := range step.Children { referenced[childID] = true } for _, c := range step.Cases { if len(c) == 2 { referenced[c[1]] = true } } if step.Timeout != nil && step.Timeout.On != "" { referenced[step.Timeout.On] = true } if step.OnError != "" { referenced[step.OnError] = true } } var entries []string for _, step := range steps { if !referenced[step.ID] { entries = append(entries, step.ID) } } return entries }