| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131 |
- package workflow
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- )
- // Engine is the main workflow execution engine
- type Engine struct {
- workflow *Workflow
- parallelExecutor *ParallelExecutor
- errorStrategy ParallelErrorStrategy
- eventSequence uint64 // Atomic counter for event sequence numbers
- stopOnce sync.Once // Captures the first Stop_* node ID
- stopNodeID string // ID of the first Stop_* node that triggered stop
- }
- // NewEngine creates a new workflow engine with default options
- func NewEngine(workflow *Workflow) (*Engine, error) {
- return NewEngineWithOptions(workflow, DefaultEngineOptions)
- }
- // NewEngineWithOptions creates a new workflow engine with custom options
- func NewEngineWithOptions(workflow *Workflow, opts EngineOptions) (*Engine, error) {
- // Validate workflow is not nil
- if workflow == nil {
- return nil, fmt.Errorf("workflow cannot be nil")
- }
- // Validate workflow structure
- if err := workflow.Validate(); err != nil {
- return nil, fmt.Errorf("invalid workflow: %w", err)
- }
- return &Engine{
- workflow: workflow,
- parallelExecutor: NewParallelExecutor(opts.MaxConcurrency),
- errorStrategy: opts.ParallelErrorStrategy,
- eventSequence: 0,
- }, nil
- }
- // Execute starts workflow execution with the given context and initial variables
- func (e *Engine) Execute(ctx context.Context, initialVars map[string]interface{}, adapters *Adapters) (*ExecutionResult, error) {
- // Validate required parameters
- if ctx == nil {
- return nil, fmt.Errorf("context cannot be nil")
- }
- if adapters == nil {
- return nil, fmt.Errorf("adapters cannot be nil")
- }
- // Create execution context
- execCtx := &ExecutionContext{
- Ctx: ctx,
- WorkflowID: generateWorkflowID(),
- Params: make(map[string]interface{}),
- Variables: make(map[string]interface{}),
- SystemVars: make(map[string]interface{}),
- LocalVars: make(map[string]interface{}),
- Artifacts: make(map[string]string),
- Status: StatusRunning,
- StartTime: time.Now(),
- RunEventStream: make(chan RunEvent, 500),
- ServiceAdapter: adapters.Service,
- APIAdapter: adapters.API,
- ComponentAdapter: adapters.Component,
- LLMAdapter: adapters.LLM,
- LLMAdapterRegistry: adapters.LLMAdapterRegistry,
- FileAdapter: adapters.File,
- DocAdapter: adapters.Doc,
- }
- // Initialize parameter types from registry
- execCtx.ParamTypes = make(map[string]string)
- if paramDecls, err := e.workflow.Registry.GetParamDeclarations(); err == nil {
- for name, decl := range paramDecls {
- execCtx.ParamTypes[name] = decl.Type
- }
- }
- // Initialize variable types from registry
- execCtx.VarTypes = make(map[string]string)
- if varDecls, err := e.workflow.Registry.GetVariableDeclarations(); err == nil {
- for name, decl := range varDecls {
- execCtx.VarTypes[name] = decl.Type
- }
- }
- // Initialize parameters and variables from initialVars
- // Parameters are those defined in registry.Params (no $ prefix)
- // Variables are those with $ prefix
- for k, v := range initialVars {
- if strings.HasPrefix(k, "$") {
- execCtx.Variables[k] = v
- } else {
- // Check if this is a declared parameter
- if paramType, ok := execCtx.ParamTypes[k]; ok {
- // Apply type conversion for OBJECT params
- convertedValue := v
- if paramType == "OBJECT" {
- if str, isString := v.(string); isString {
- var obj map[string]interface{}
- if err := json.Unmarshal([]byte(str), &obj); err != nil {
- return nil, fmt.Errorf("failed to convert parameter %s to OBJECT: %w", k, err)
- }
- convertedValue = obj
- }
- }
- execCtx.Params[k] = convertedValue
- } else {
- // Not a declared parameter, treat as variable with $ prefix
- execCtx.Variables["$"+k] = v
- }
- }
- }
- // Apply registry param defaults for parameters not supplied by the caller (spec §3.2).
- // This runs after initialVars processing so explicit caller values always take precedence.
- if paramDecls, err := e.workflow.Registry.GetParamDeclarations(); err == nil {
- for name, decl := range paramDecls {
- if _, alreadySet := execCtx.Params[name]; !alreadySet && decl.Default != nil {
- if defaultVal, coerceErr := CoerceParamDefault(decl); coerceErr == nil && defaultVal != nil {
- execCtx.Params[name] = defaultVal
- }
- }
- }
- }
- // Execute workflow
- go e.executeWorkflow(execCtx)
- // Return result
- return &ExecutionResult{
- Context: execCtx,
- RunEventStream: execCtx.RunEventStream,
- }, nil
- }
- // ExecuteWithRunParams starts workflow execution with run-level control parameters (v3.15+, spec §2.2).
- // RunParams allow the caller to specify a workspace scope, a selective node list, an
- // execution mode, and business-level input params without polluting the workflow's own registry.
- //
- // RunParams.Params (spec §1.7 runParams.params) are merged into the effective initial variables.
- // They take precedence over initialVars for the same key, mirroring how the online platform
- // passes declared registry params to the engine separately from internal engine state.
- func (e *Engine) ExecuteWithRunParams(ctx context.Context, initialVars map[string]interface{}, adapters *Adapters, runParams RunParams) (*ExecutionResult, error) {
- // Merge RunParams.Params + initialVars → RunParams.Params wins (they are the caller-declared params).
- // RunParams.Params keys are bare names (e.g. "prompt") per the spec §1.7.
- // They are passed through unchanged so Execute()'s param-routing logic places them in
- // execCtx.Params[name], accessible via =name expressions (no $ prefix).
- merged := make(map[string]interface{}, len(initialVars)+len(runParams.Params))
- for k, v := range initialVars {
- merged[k] = v
- }
- for k, v := range runParams.Params {
- merged[k] = v
- }
- result, err := e.Execute(ctx, merged, adapters)
- if err != nil {
- return nil, err
- }
- // Attach RunParams to the execution context so downstream steps / adapters can inspect them.
- result.Context.RunParams = &runParams
- return result, nil
- }
- // executeWorkflow executes the workflow from the beginning
- func (e *Engine) executeWorkflow(execCtx *ExecutionContext) {
- defer close(execCtx.RunEventStream)
- // Emit workflow_start RunEvent
- e.emitRunEvent(execCtx, RunEventWorkflowStart, nil, map[string]interface{}{
- "params": execCtx.Params,
- })
- // Find true entry nodes (spec §1.4C: unreferenced nodes, NOT always steps[0]).
- entryIDs := findEntryNodeIDs(e.workflow.Steps)
- if len(entryIDs) == 0 {
- e.fail(execCtx, fmt.Errorf("no entry nodes found in workflow"))
- return
- }
- var execErr error
- if len(entryIDs) == 1 {
- // Single entry: execute serially (common case)
- entry := e.findStepByID(entryIDs[0])
- execErr = e.executeStep(execCtx, entry)
- } else {
- // Multiple entry nodes: execute in parallel (spec §1.4C)
- execErr = e.executeChildren(execCtx, entryIDs)
- }
- if execErr != nil {
- e.fail(execCtx, execErr)
- return
- }
- // Terminal event: emit workflow_done AFTER all branches have finished.
- switch execCtx.Status {
- case StatusRunning:
- e.complete(execCtx)
- case StatusStopped:
- // Deferred from e.stop() — emit workflow_done now that every parallel
- // branch has returned, so all step events precede the terminal event.
- e.emitRunEvent(execCtx, RunEventWorkflowDone, nil, map[string]interface{}{
- "stop_id": e.stopNodeID,
- "duration_ms": time.Since(execCtx.StartTime).Milliseconds(),
- })
- }
- }
- // executeStep executes a single step
- func (e *Engine) executeStep(ctx ContextAccessor, step *Step) error {
- // Get base context
- baseCtx := ctx.GetBaseContext()
- // Short-circuit if this branch has been stopped or the workflow is paused.
- // For ChildExecutionContext (parallel branches), IsStopped() checks the
- // branch-local flag so sibling branches are NOT short-circuited.
- // For non-parallel paths, IsStopped() checks the global Status.
- // Pause is always a global check — all branches should pause.
- if ctx.IsStopped() || baseCtx.Status == StatusPaused {
- return nil
- }
- // Set current step ID using thread-safe method
- ctx.SetCurrentStepID(step.ID)
- // Collect file write targets for the step.start event and file_start RunEvents.
- // Expand {expr} templates in out-key paths (e.g. "/{_item.filePath}") so that
- // file_start carries the actual resolved path, not the raw template string.
- var fileTargets []string
- for target := range step.Out {
- if strings.HasPrefix(target, "/") {
- if expanded, err := e.interpolateFilePath(ctx, target); err == nil {
- fileTargets = append(fileTargets, expanded)
- } else {
- fileTargets = append(fileTargets, target)
- }
- }
- }
- if strings.HasPrefix(step.ID, "Write_") && step.Target != "" {
- // Try to evaluate the target path expression; fall back to the raw string
- evaluator := NewExpressionEvaluator(ctx)
- if targetVal, err := evaluator.EvaluateValue(step.Target); err == nil {
- if targetStr, ok := targetVal.(string); ok {
- fileTargets = append(fileTargets, targetStr)
- } else {
- fileTargets = append(fileTargets, step.Target)
- }
- } else {
- fileTargets = append(fileTargets, step.Target)
- }
- }
- // Check if condition
- stepType := e.getStepType(step.ID)
- stepTypePattern := string(stepType) + "_*"
- if step.If != "" {
- evaluator := NewExpressionEvaluator(ctx)
- var result interface{}
- var err error
- if strings.HasPrefix(step.If, "=") {
- result, err = evaluator.EvaluateValue(step.If)
- } else {
- result, err = evaluator.Evaluate(step.If)
- }
- if err != nil {
- return fmt.Errorf("failed to evaluate if condition: %w", err)
- }
- if !toBool(result) {
- // Emit step_skipped RunEvent (spec 3.12 §13.3)
- stepID := step.ID
- e.emitRunEvent(baseCtx, RunEventStepSkipped, &stepID, map[string]interface{}{
- "step_type": stepTypePattern,
- "reason": "if_false",
- })
- // Skip this step and its children, move to next
- if step.Next != "" {
- // Handle special next values
- if step.Next == "RETURN" {
- return nil
- }
- if step.Next == "BREAK" {
- return ErrBreak
- }
- nextStep := e.findStepByID(step.Next)
- if nextStep != nil {
- return e.executeStep(ctx, nextStep)
- }
- }
- // No next - this branch is complete
- return nil
- }
- }
- // Emit step_start RunEvent (condition passed, about to execute)
- stepID := step.ID
- e.emitRunEvent(baseCtx, RunEventStepStart, &stepID, map[string]interface{}{
- "step_type": stepTypePattern,
- })
- // Emit file_start RunEvents for all file targets (spec 3.13 §13.3)
- // Batch-emitted immediately after step_start, before any llm_token
- for _, target := range fileTargets {
- path := strings.TrimPrefix(target, "/")
- e.emitRunEvent(baseCtx, RunEventFileStart, &stepID, map[string]interface{}{
- "path": path,
- })
- }
- stepStartTime := time.Now()
- var err error
- switch stepType {
- case StepTypeService:
- err = e.executeServiceStep(ctx, step)
- case StepTypeAPI:
- err = e.executeAPIStep(ctx, step)
- case StepTypeComponent:
- err = e.executeComponentStep(ctx, step)
- case StepTypeLLM:
- err = e.executeLLMStep(ctx, step)
- case StepTypeSet:
- err = e.executeSetStep(ctx, step)
- case StepTypeWrite:
- err = e.executeWriteStep(ctx, step)
- case StepTypeDownload:
- err = e.executeDownloadStep(ctx, step)
- case StepTypeUnzip:
- err = e.executeUnzipStep(ctx, step)
- case StepTypePause:
- // Pause_* has special routing semantics: executePauseStep blocks until
- // the workflow is resumed or times out, then handles step_done and next
- // routing internally (similar to how Stop_* handles workflow termination).
- pauseErr := e.executePauseStep(ctx, step, stepStartTime, stepTypePattern)
- if pauseErr == nil {
- return nil // executePauseStep handled step_done and next routing
- }
- // Non-nil error (e.g. context cancellation): emit step_error and handle onError
- e.emitRunEvent(baseCtx, RunEventStepError, &stepID, map[string]interface{}{
- "step_type": stepTypePattern,
- "error": buildErrorMap(pauseErr),
- "duration_ms": time.Since(stepStartTime).Milliseconds(),
- })
- if step.OnError != "" && e.isV310OrLater() {
- ctx.SetLocalVar("_error", buildErrorMap(pauseErr))
- errorStep := e.findStepByID(step.OnError)
- if errorStep == nil {
- ctx.DeleteLocalVar("_error")
- ctx.DeleteLocalVar("_meta")
- return fmt.Errorf("onError step not found: %s (referenced by %s)", step.OnError, step.ID)
- }
- onErrResult := e.executeStep(ctx, errorStep)
- ctx.DeleteLocalVar("_error")
- ctx.DeleteLocalVar("_meta")
- return onErrResult
- }
- return pauseErr
- case StepTypeBranch:
- err = e.executeBranchStep(ctx, step)
- case StepTypeLoop:
- err = e.executeLoopStep(ctx, step)
- case StepTypeStop:
- e.stop(ctx)
- return nil
- case StepTypeNoop:
- // Noop step does nothing, just executes children
- err = nil
- default:
- return fmt.Errorf("unknown step type: %s", step.ID)
- }
- if err != nil {
- // v3.16+: BREAK is a control flow signal, not an error — propagate without step_error
- if IsBreakError(err) {
- return err
- }
- // Emit step_error RunEvent (spec 3.12 §13.4)
- e.emitRunEvent(baseCtx, RunEventStepError, &stepID, map[string]interface{}{
- "step_type": stepTypePattern,
- "error": buildErrorMap(err),
- "duration_ms": time.Since(stepStartTime).Milliseconds(),
- })
- // v3.10+: If step has onError handler, jump there instead of failing
- if step.OnError != "" && e.isV310OrLater() {
- // _error is built from the error (structured if LLMError, generic otherwise)
- ctx.SetLocalVar("_error", buildErrorMap(err))
- // Execute the onError handler step
- errorStep := e.findStepByID(step.OnError)
- if errorStep == nil {
- ctx.DeleteLocalVar("_error")
- ctx.DeleteLocalVar("_meta")
- return fmt.Errorf("onError step not found: %s (referenced by %s)", step.OnError, step.ID)
- }
- onErrResult := e.executeStep(ctx, errorStep)
- // Clean up after onError handler completes
- ctx.DeleteLocalVar("_error")
- ctx.DeleteLocalVar("_meta")
- return onErrResult
- }
- return err
- }
- // Emit step_print RunEvent (spec 3.13 §5.2.12)
- // Order: file_done(×N) → step_print(0..1) → step_done
- // Only on successful execution; Stop_* nodes cannot have print
- if step.Print != "" && stepType != StepTypeStop {
- evaluator := NewExpressionEvaluator(ctx)
- printVal, err := evaluator.EvaluateValue(step.Print)
- if err == nil {
- var message string
- switch v := printVal.(type) {
- case string:
- message = v
- default:
- if b, jerr := json.Marshal(v); jerr == nil {
- message = string(b)
- } else {
- message = fmt.Sprintf("%v", v)
- }
- }
- e.emitRunEvent(baseCtx, RunEventStepPrint, &stepID, map[string]interface{}{
- "message": message,
- })
- }
- }
- // Emit step_done RunEvent (spec 3.12 §13.4)
- e.emitRunEvent(baseCtx, RunEventStepDone, &stepID, map[string]interface{}{
- "step_type": stepTypePattern,
- "duration_ms": time.Since(stepStartTime).Milliseconds(),
- })
- // Execute children (parallel branches)
- // Skip for Loop_* and Branch_* which handle their own children/branches internally
- if len(step.Children) > 0 && stepType != StepTypeLoop && stepType != StepTypeBranch {
- if err := e.executeChildren(ctx, step.Children); err != nil {
- return err
- }
- }
- // Move to next step
- if step.Next != "" {
- // Handle special next values
- if step.Next == "RETURN" {
- // Return from current execution path (used in children/loops)
- return nil
- }
- if step.Next == "BREAK" {
- // v3.16+: Exit the entire enclosing loop
- return ErrBreak
- }
- nextStep := e.findStepByID(step.Next)
- if nextStep != nil {
- return e.executeStep(ctx, nextStep)
- }
- return fmt.Errorf("next step not found: %s", step.Next)
- }
- // No next step - this branch is complete
- // Children workflows ending here are considered complete
- // Main workflow completion is handled by executeWorkflow
- return nil
- }
- // getStepType extracts the step type from the ID prefix
- func (e *Engine) getStepType(stepID string) StepType {
- if strings.HasPrefix(stepID, "Service_") {
- return StepTypeService
- } else if strings.HasPrefix(stepID, "API_") {
- return StepTypeAPI
- } else if strings.HasPrefix(stepID, "Component_") {
- return StepTypeComponent
- } else if strings.HasPrefix(stepID, "LLM_") {
- return StepTypeLLM
- } else if strings.HasPrefix(stepID, "Set_") {
- return StepTypeSet
- } else if strings.HasPrefix(stepID, "Write_") {
- return StepTypeWrite
- } else if strings.HasPrefix(stepID, "Download_") {
- return StepTypeDownload
- } else if strings.HasPrefix(stepID, "Unzip_") {
- return StepTypeUnzip
- } else if strings.HasPrefix(stepID, "Pause_") {
- return StepTypePause
- } else if strings.HasPrefix(stepID, "Branch_") {
- return StepTypeBranch
- } else if strings.HasPrefix(stepID, "Loop_") {
- return StepTypeLoop
- } else if strings.HasPrefix(stepID, "Stop_") {
- return StepTypeStop
- } else if strings.HasPrefix(stepID, "Noop_") {
- return StepTypeNoop
- }
- return ""
- }
- // findStepByID finds a step by its ID
- func (e *Engine) findStepByID(stepID string) *Step {
- for i := range e.workflow.Steps {
- if e.workflow.Steps[i].ID == stepID {
- return &e.workflow.Steps[i]
- }
- }
- return nil
- }
- // stop stops the workflow (called when a Stop_* node executes).
- // Multiple parallel branches may each reach their own Stop_* node;
- // stopOnce ensures workflow_done is emitted exactly once.
- func (e *Engine) stop(ctx ContextAccessor) {
- // Use thread-safe method to set status
- ctx.SetStatus(StatusStopped)
- baseCtx := ctx.GetBaseContext()
- // Capture the first Stop_* node ID. The actual workflow_done event is emitted
- // by executeWorkflow AFTER all parallel branches have finished, so the SSE
- // reader sees every step event before the terminal event.
- e.stopOnce.Do(func() {
- e.stopNodeID = baseCtx.CurrentStepID
- })
- }
- // complete marks the workflow as completed (reached end naturally, no Stop_* node)
- func (e *Engine) complete(ctx ContextAccessor) {
- ctx.SetStatus(StatusCompleted)
- baseCtx := ctx.GetBaseContext()
- // Emit workflow_done so clients receive the same signal whether the workflow
- // terminated via Stop_* or ran off the end of the last step.
- e.emitRunEvent(baseCtx, RunEventWorkflowDone, nil, map[string]interface{}{
- "duration_ms": time.Since(baseCtx.StartTime).Milliseconds(),
- })
- }
- // fail fails the workflow
- func (e *Engine) fail(ctx ContextAccessor, err error) {
- // Use thread-safe method to set status
- ctx.SetStatus(StatusFailed)
- baseCtx := ctx.GetBaseContext()
- // Emit workflow_failed RunEvent (spec 3.12 §13.4)
- e.emitRunEvent(baseCtx, RunEventWorkflowFailed, nil, map[string]interface{}{
- "failed_step_id": baseCtx.CurrentStepID,
- "error": buildErrorMap(err),
- "duration_ms": time.Since(baseCtx.StartTime).Milliseconds(),
- })
- }
- // Resume sends a resume signal to a workflow that is suspended at a Pause_* node (v3.15+).
- // It is idempotent: a duplicate call with the same RequestID is silently ignored.
- // Returns an error if the workflow is not currently paused, the token is invalid,
- // or the resume signal has already been delivered.
- func (e *Engine) Resume(execCtx *ExecutionContext, req ResumeRequest) error {
- execCtx.pauseMu.Lock()
- state := execCtx.PauseState
- execCtx.pauseMu.Unlock()
- if state == nil {
- // Emit pause_rejected for state_error (spec §11.5.4: non-paused state → rejected)
- e.emitRunEvent(execCtx, RunEventPauseRejected, nil, map[string]interface{}{
- "requestId": req.RequestID,
- "reasonCode": "state_error",
- })
- return fmt.Errorf("workflow is not in paused state")
- }
- // Validate RunID if provided (spec §11.4.1: runId must identify the correct run)
- if req.RunID != "" && req.RunID != execCtx.WorkflowID {
- return fmt.Errorf("resume runId %q does not match workflow run %q", req.RunID, execCtx.WorkflowID)
- }
- state.mu.Lock()
- defer state.mu.Unlock()
- // Validate token before any idempotency check
- if req.Token != state.token {
- nodeID := state.nodeID
- e.emitRunEvent(execCtx, RunEventPauseRejected, &nodeID, map[string]interface{}{
- "nodeId": state.nodeID,
- "requestId": req.RequestID,
- "reasonCode": "invalid_token",
- })
- return fmt.Errorf("invalid resume token")
- }
- // Idempotency: duplicate RequestID → silent no-op
- if req.RequestID != "" && state.seenRequestIDs[req.RequestID] {
- return nil
- }
- // Reject if already resumed by a different request
- if state.resumed {
- nodeID := state.nodeID
- e.emitRunEvent(execCtx, RunEventPauseRejected, &nodeID, map[string]interface{}{
- "nodeId": state.nodeID,
- "requestId": req.RequestID,
- "reasonCode": "already_resumed",
- })
- return fmt.Errorf("workflow has already been resumed")
- }
- // Record request ID for idempotency before sending
- if req.RequestID != "" {
- state.seenRequestIDs[req.RequestID] = true
- }
- state.resumed = true
- // Send signal (buffered channel size 1, non-blocking)
- state.ch <- resumeSignal{Payload: req.Payload, RequestID: req.RequestID}
- return nil
- }
- // snapshotLocalVars copies known local variables from a ContextAccessor
- // so they can be propagated to child contexts (e.g., _item/_index from loops)
- func snapshotLocalVars(ctx ContextAccessor) map[string]interface{} {
- snapshot := make(map[string]interface{})
- for _, key := range []string{"_item", "_index", "_result", "_meta", "_error"} {
- if val, ok := ctx.GetLocalVar(key); ok {
- snapshot[key] = val
- }
- }
- return snapshot
- }
- // executeChildren executes child steps in parallel
- func (e *Engine) executeChildren(parentCtx ContextAccessor, childIDs []string) error {
- if len(childIDs) == 0 {
- return nil
- }
- baseCtx := parentCtx.GetBaseContext()
- // Optimization: single child doesn't need parallelism
- if len(childIDs) == 1 {
- child := e.findStepByID(childIDs[0])
- if child == nil {
- return fmt.Errorf("child step not found: %s", childIDs[0])
- }
- return e.executeStep(parentCtx, child)
- }
- // Parallel execution for multiple children
- safeCtx := NewSafeExecutionContext(baseCtx)
- executor := e.getParallelExecutor()
- // Snapshot parent's local vars so child branches inherit them (e.g., _item/_index from loops)
- parentLocalVars := snapshotLocalVars(parentCtx)
- // Create branches for each child
- branches := make([]ParallelBranch, len(childIDs))
- for i, childID := range childIDs {
- id := childID
- branches[i] = ParallelBranch{
- ID: id,
- Fn: func(ctx context.Context) error {
- // Create child context with isolated local vars
- childCtx := NewChildExecutionContext(safeCtx)
- // Propagate parent's local vars (e.g., _item, _index from enclosing loop)
- for k, v := range parentLocalVars {
- childCtx.LocalVars[k] = v
- }
- // Check cancellation
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
- child := e.findStepByID(id)
- if child == nil {
- return fmt.Errorf("child step not found: %s", id)
- }
- // Execute with child context
- if err := e.executeStep(childCtx, child); err != nil {
- return fmt.Errorf("child %s: %w", id, err)
- }
- return nil
- },
- }
- }
- // Execute all branches in parallel
- if err := executor.Execute(baseCtx.Ctx, branches, e.getErrorStrategy()); err != nil {
- return fmt.Errorf("parallel children execution failed: %w", err)
- }
- return nil
- }
- // getParallelExecutor returns the parallel executor
- func (e *Engine) getParallelExecutor() *ParallelExecutor {
- return e.parallelExecutor
- }
- // getErrorStrategy returns the configured error strategy
- func (e *Engine) getErrorStrategy() ParallelErrorStrategy {
- return e.errorStrategy
- }
- // isV310OrLater returns true if the workflow version is 3.10 or later
- func (e *Engine) isV310OrLater() bool {
- v := e.workflow.Version
- return v == "3.10" || v == "3.12" || v == "3.13" || v == "3.14" || v == "3.15" || v == "3.16"
- }
- // buildErrorMap converts an error into a map suitable for _error local variable
- func buildErrorMap(err error) map[string]interface{} {
- var llmErr *LLMError
- if errors.As(err, &llmErr) {
- return llmErr.ToMap()
- }
- // Generic error fallback
- return map[string]interface{}{
- "type": "internal_error",
- "code": "INTERNAL_ERROR",
- "message": err.Error(),
- }
- }
- // nextEventSequence returns the next event sequence number
- func (e *Engine) nextEventSequence() uint64 {
- return atomic.AddUint64(&e.eventSequence, 1)
- }
- // emitRunEvent emits a structured RunEvent to the run_events stream (spec 3.12 Chapter 13).
- // Uses a non-blocking send (fire-and-forget): events are dropped if the channel is full or closed.
- // The send is wrapped in a recover to guard against "send on closed channel" panics that can
- // occur when Resume emits a pause_rejected event after the workflow goroutine has already closed
- // the stream (e.g. the workflow completed between the paused check and the emit call).
- func (e *Engine) emitRunEvent(ctx *ExecutionContext, eventType RunEventType, stepID *string, payload map[string]interface{}) {
- if ctx.RunEventStream == nil {
- return
- }
- event := RunEvent{
- RunID: ctx.WorkflowID,
- Seq: e.nextEventSequence(),
- Ts: time.Now().UTC().Format("2006-01-02T15:04:05.000Z07:00"),
- Type: eventType,
- StepID: stepID,
- Payload: payload,
- }
- // Recover from "send on closed channel" panic (can happen when Resume is called
- // after the workflow goroutine has closed RunEventStream).
- defer func() { recover() }() //nolint:errcheck
- select {
- case ctx.RunEventStream <- event:
- default:
- // fire-and-forget: drop silently if channel is full
- }
- }
- // Adapters holds all the adapters needed for workflow execution
- type Adapters struct {
- Service ServiceAdapter
- API APIAdapter
- Component ComponentAdapter
- LLM LLMAdapter
- LLMAdapterRegistry *LLMAdapterRegistry // v3.16+: multi-provider registry (nil for pre-3.16)
- File FileAdapter
- Doc DocAdapter
- }
- // ExecutionResult holds the result of workflow execution
- type ExecutionResult struct {
- Context *ExecutionContext
- RunEventStream <-chan RunEvent // v3.12+: structured run_events stream (spec Chapter 13)
- }
- // Validate validates the workflow structure
- func (w *Workflow) Validate() error {
- if w.Version != "3.6" && w.Version != "3.7" && w.Version != "3.8" && w.Version != "3.9" && w.Version != "3.10" && w.Version != "3.12" && w.Version != "3.13" && w.Version != "3.14" && w.Version != "3.15" && w.Version != "3.16" {
- return fmt.Errorf("unsupported workflow version: %s (expected 3.6, 3.7, 3.8, 3.9, 3.10, 3.12, 3.13, 3.14, 3.15, or 3.16)", w.Version)
- }
- if w.Name == "" {
- return fmt.Errorf("workflow name is required")
- }
- if len(w.Steps) == 0 {
- return fmt.Errorf("workflow must have at least one step")
- }
- // Validate registry
- if err := w.Registry.ValidateRegistry(); err != nil {
- return err
- }
- // IDE scenario validation (v3.9+): forbid Service_* nodes
- if w.WorkflowType == WorkflowTypeIDE {
- // Check that no Service_* nodes exist in steps
- for _, step := range w.Steps {
- if strings.HasPrefix(step.ID, "Service_") {
- return fmt.Errorf("IDE workflow (WorkflowType: IDE) cannot contain Service_* nodes: %s", step.ID)
- }
- }
- // Check that registry.services is empty
- if len(w.Registry.Services) > 0 {
- return fmt.Errorf("IDE workflow (WorkflowType: IDE) must have empty registry.services")
- }
- }
- // ── Phase 1: Build stepID set, check uniqueness and basic field constraints ──────────
- stepIDs := make(map[string]bool)
- for _, step := range w.Steps {
- if stepIDs[step.ID] {
- return fmt.Errorf("duplicate step ID: %s", step.ID)
- }
- // v3.16+: BREAK and RETURN are reserved keywords, cannot be used as stepId
- if step.ID == "BREAK" {
- return fmt.Errorf("'BREAK' is a reserved keyword and cannot be used as a step ID")
- }
- if step.ID == "RETURN" {
- return fmt.Errorf("'RETURN' is a reserved keyword and cannot be used as a step ID")
- }
- stepIDs[step.ID] = true
- // Stop_* must NOT have next or children (spec §2.3.4)
- if strings.HasPrefix(step.ID, "Stop_") {
- if step.Next != "" {
- return fmt.Errorf("step %s: Stop_* steps cannot have a 'next' field (spec §2.3.4)", step.ID)
- }
- if len(step.Children) > 0 {
- return fmt.Errorf("step %s: Stop_* steps cannot have 'children' (spec §2.3.4)", step.ID)
- }
- } else {
- // All other steps must declare a 'next' field
- if step.Next == "" {
- return fmt.Errorf("step %s must have a 'next' field (use 'RETURN' for child/loop steps)", step.ID)
- }
- }
- }
- // ── Phase 2: Reference integrity (spec §2.3.3) ────────────────────────────────────
- // Every next / children / cases / timeout.on / onError value must resolve to an
- // existing step ID (or the special sentinels "RETURN" / "BREAK").
- for _, step := range w.Steps {
- check := func(fieldName, target string) error {
- if target == "" || target == "RETURN" || target == "BREAK" {
- return nil
- }
- if !stepIDs[target] {
- return fmt.Errorf("step %s: %s references non-existent step %q", step.ID, fieldName, target)
- }
- return nil
- }
- if err := check("next", step.Next); err != nil {
- return err
- }
- for i, childID := range step.Children {
- if err := check(fmt.Sprintf("children[%d]", i), childID); err != nil {
- return err
- }
- }
- for i, c := range step.Cases {
- if len(c) == 2 {
- if err := check(fmt.Sprintf("cases[%d]", i), c[1]); err != nil {
- return err
- }
- }
- }
- if step.Timeout != nil {
- if err := check("timeout.on", step.Timeout.On); err != nil {
- return err
- }
- }
- if err := check("onError", step.OnError); err != nil {
- return err
- }
- }
- // ── Phase 2b: BREAK scope validation (v3.16+) ─────────────────────────────────────
- // BREAK is only valid inside a Loop_* children subtree.
- // Build the set of stepIDs that are transitively reachable from any Loop_* children entry.
- loopChildren := make(map[string]bool)
- for _, step := range w.Steps {
- if !strings.HasPrefix(step.ID, "Loop_") || len(step.Children) == 0 {
- continue
- }
- // BFS from this Loop_*'s children entries
- visited := make(map[string]bool)
- bfsQueue := make([]string, len(step.Children))
- copy(bfsQueue, step.Children)
- for _, id := range step.Children {
- visited[id] = true
- }
- for len(bfsQueue) > 0 {
- cur := bfsQueue[0]
- bfsQueue = bfsQueue[1:]
- loopChildren[cur] = true
- // Find the step and add its successors (skip RETURN/BREAK which terminate)
- for _, s := range w.Steps {
- if s.ID != cur {
- continue
- }
- succs := []string{s.Next, s.OnError}
- succs = append(succs, s.Children...)
- for _, c := range s.Cases {
- if len(c) == 2 {
- succs = append(succs, c[1])
- }
- }
- if s.Timeout != nil {
- succs = append(succs, s.Timeout.On)
- }
- for _, sid := range succs {
- if sid == "" || sid == "RETURN" || sid == "BREAK" || visited[sid] {
- continue
- }
- visited[sid] = true
- bfsQueue = append(bfsQueue, sid)
- }
- break
- }
- }
- }
- // Verify every step with next:"BREAK" is inside a Loop_* children subtree
- for _, step := range w.Steps {
- if step.Next == "BREAK" && !loopChildren[step.ID] {
- return fmt.Errorf("step %s: 'BREAK' is only valid inside a Loop_* children subtree", step.ID)
- }
- // Also check Branch cases that target BREAK-reachable steps
- for _, c := range step.Cases {
- if len(c) == 2 && c[1] == "BREAK" && !loopChildren[step.ID] {
- return fmt.Errorf("step %s: BREAK target in cases is only valid inside a Loop_* children subtree", step.ID)
- }
- }
- }
- // ── Phase 3: Entry node validation (spec §2.3.1) ──────────────────────────────────
- entryNodeIDs := findEntryNodeIDs(w.Steps)
- if len(entryNodeIDs) == 0 {
- return fmt.Errorf("workflow graph has no entry node: every step is referenced by another step, creating a cycle with no starting point")
- }
- // ── Phase 4: Reachability (spec §2.3.2) ───────────────────────────────────────────
- // All steps must be reachable from at least one entry node via BFS.
- reachable := make(map[string]bool)
- queue := make([]string, len(entryNodeIDs))
- copy(queue, entryNodeIDs)
- for _, id := range entryNodeIDs {
- reachable[id] = true
- }
- for len(queue) > 0 {
- curr := queue[0]
- queue = queue[1:]
- // Find step (linear scan is fine at validation time)
- for _, step := range w.Steps {
- if step.ID != curr {
- continue
- }
- successors := []string{step.Next, step.OnError}
- successors = append(successors, step.Children...)
- for _, c := range step.Cases {
- if len(c) == 2 {
- successors = append(successors, c[1])
- }
- }
- if step.Timeout != nil {
- successors = append(successors, step.Timeout.On)
- }
- for _, s := range successors {
- if s != "" && s != "RETURN" && s != "BREAK" && !reachable[s] {
- reachable[s] = true
- queue = append(queue, s)
- }
- }
- break
- }
- }
- for _, step := range w.Steps {
- if !reachable[step.ID] {
- return fmt.Errorf("step %s is unreachable from any entry node (spec §2.3.2)", step.ID)
- }
- }
- // Validate Download_* and Unzip_* field constraints (v3.14+)
- for _, step := range w.Steps {
- if strings.HasPrefix(step.ID, "Download_") {
- // source is required
- if step.Source == nil {
- return fmt.Errorf("step %s: 'source' is required for Download_* steps", step.ID)
- }
- // target and routeByExt/defaultDir are mutually exclusive; at least one is required
- hasTarget := step.Target != ""
- hasRoute := len(step.RouteByExt) > 0 || step.DefaultDir != ""
- if hasTarget && hasRoute {
- return fmt.Errorf("step %s: 'target' and 'routeByExt'/'defaultDir' are mutually exclusive for Download_* steps", step.ID)
- }
- if !hasTarget && !hasRoute {
- return fmt.Errorf("step %s: Download_* steps must specify either 'target' or 'routeByExt'/'defaultDir'", step.ID)
- }
- }
- if strings.HasPrefix(step.ID, "Unzip_") {
- // source is required
- if step.Source == nil {
- return fmt.Errorf("step %s: 'source' is required for Unzip_* steps", step.ID)
- }
- // routeByExt is required (may be empty map {} combined with defaultDir, but the field must be declared)
- if step.RouteByExt == nil {
- return fmt.Errorf("step %s: 'routeByExt' is required for Unzip_* steps (use {} with 'defaultDir' to route all files to one directory)", step.ID)
- }
- }
- if strings.HasPrefix(step.ID, "Pause_") {
- // resumeResultTarget is required (spec §5.1, §11.2.1)
- if step.ResumeResultTarget == "" {
- return fmt.Errorf("step %s: 'resumeResultTarget' is required for Pause_* steps", step.ID)
- }
- // children is not applicable for Pause_* (spec §5.1: 不适用)
- if len(step.Children) > 0 {
- return fmt.Errorf("step %s: 'children' is not supported on Pause_* steps (spec §5.1)", step.ID)
- }
- // If timeout is provided, sec must be > 0 and on must be non-empty (spec §11.2.2)
- if step.Timeout != nil {
- if step.Timeout.Sec <= 0 {
- return fmt.Errorf("step %s: timeout.sec must be > 0 for Pause_* steps", step.ID)
- }
- if step.Timeout.On == "" {
- return fmt.Errorf("step %s: timeout.on is required when timeout is configured for Pause_* steps", step.ID)
- }
- }
- }
- // v3.16+: Loop_* field constraints
- if strings.HasPrefix(step.ID, "Loop_") {
- hasWhile := step.While != ""
- hasSource := step.Source != nil
- // source is interface{} — also check string form
- if sourceStr, ok := step.Source.(string); ok && sourceStr == "" {
- hasSource = false
- }
- // while and source both present → ERROR
- if hasWhile && hasSource {
- return fmt.Errorf("step %s: 'while' and 'source' are mutually exclusive on Loop_* steps", step.ID)
- }
- // while without maxIterations → ERROR
- if hasWhile && step.MaxIterations == nil {
- return fmt.Errorf("step %s: 'maxIterations' is required when 'while' is used on Loop_* steps", step.ID)
- }
- // maxIterations < 1 → ERROR
- if step.MaxIterations != nil && *step.MaxIterations < 1 {
- return fmt.Errorf("step %s: 'maxIterations' must be >= 1, got %d", step.ID, *step.MaxIterations)
- }
- // while + mode:"parallel" → ERROR
- if hasWhile && step.Mode == "parallel" {
- return fmt.Errorf("step %s: 'while' mode requires mode 'serial', got 'parallel'", step.ID)
- }
- }
- // v3.16+: LLM_* model field validation
- if strings.HasPrefix(step.ID, "LLM_") && step.Model != "" {
- // model contains "/" but provider or modelId part is empty → ERROR
- if strings.Contains(step.Model, "/") {
- parts := strings.SplitN(step.Model, "/", 2)
- if parts[0] == "" || parts[1] == "" {
- return fmt.Errorf("step %s: 'model' with '/' must have non-empty provider and modelId (got %q)", step.ID, step.Model)
- }
- }
- }
- }
- return nil
- }
- // generateWorkflowID generates a unique workflow execution ID
- func generateWorkflowID() string {
- return fmt.Sprintf("wf_%d", time.Now().UnixNano())
- }
- // findEntryNodeIDs returns the IDs of all "entry" nodes in a step list.
- // An entry node is one whose ID does not appear in any other step's
- // next / children / cases / timeout.on / onError fields.
- // Spec §1.4C: the engine auto-identifies entry nodes; it MUST NOT always use steps[0].
- func findEntryNodeIDs(steps []Step) []string {
- referenced := make(map[string]bool)
- for _, step := range steps {
- if step.Next != "" && step.Next != "RETURN" && step.Next != "BREAK" {
- referenced[step.Next] = true
- }
- for _, childID := range step.Children {
- referenced[childID] = true
- }
- for _, c := range step.Cases {
- if len(c) == 2 {
- referenced[c[1]] = true
- }
- }
- if step.Timeout != nil && step.Timeout.On != "" {
- referenced[step.Timeout.On] = true
- }
- if step.OnError != "" {
- referenced[step.OnError] = true
- }
- }
- var entries []string
- for _, step := range steps {
- if !referenced[step.ID] {
- entries = append(entries, step.ID)
- }
- }
- return entries
- }
|