| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747 |
- // Package workflow implements a v3.x workflow orchestration engine (current baseline: v3.16)
- // that supports multi-step output, API calls, structured output (v3.7+), output_config passthrough (v3.7+),
- // schema reuse with schemaRef (v3.9+), unified LLM output semantics with _result/_meta/_error (v3.10+),
- // stream as top-level LLM_* node attribute (v3.12+), run_events structured event stream (v3.12+),
- // file_start/file_done run_events for file write lifecycle tracking (v3.13+),
- // Download_*/Unzip_* nodes, .tmp/ path isolation, _iterDir, Write prepend (v3.14+),
- // Pause_* nodes with resume protocol for explicit workflow suspension (v3.15+),
- // Loop_* while-mode with maxIterations, BREAK keyword, and multi-provider LLM with model field (v3.16+).
- package workflow
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "sync"
- "time"
- )
- // WorkflowType represents the type/location of a workflow
- type WorkflowType string
- const (
- WorkflowTypeIDE WorkflowType = "IDE" // IDE agent flow (Process/) - no Service_* nodes allowed
- WorkflowTypeBusiness WorkflowType = "Business" // Business workflow (Workflows/) - can call services
- WorkflowTypeApproval WorkflowType = "Approval" // Approval workflow (Workflows/) - can call services
- WorkflowTypeLocal WorkflowType = "Local" // Local workflow (LocalWorkflows/) - developer self-use
- )
- // Workflow represents the top-level workflow structure.
- type Workflow struct {
- Version string `json:"version"` // Current baseline: "3.13" (older v3.x versions remain compatibility-supported)
- Name string `json:"name"` // Workflow name
- Registry Registry `json:"registry"` // External resources and global boundaries
- Steps []Step `json:"steps"` // Node list (workflow body)
- WorkflowType WorkflowType `json:"workflowType,omitempty"` // Optional: IDE, Business, Approval, Local
- }
- // Step represents a workflow step/node with common properties
- type Step struct {
- ID string `json:"id"` // Unique identifier (with type prefix like Service_xxx, LLM_xxx)
- If string `json:"if,omitempty"` // Execution condition expression
- In StepInput `json:"in,omitempty"` // Input parameters
- Out StepOutput `json:"out,omitempty"` // Output mapping to global vars and files
- Target string `json:"target,omitempty"` // Write target (for Set_*, Write_*, Download_*)
- Value string `json:"value,omitempty"` // Write value expression (for Set_*, Write_*)
- Source interface{} `json:"source,omitempty"` // Data source: expression string (Loop_*), URL/object (Download_*), zip path (Unzip_*)
- RouteByExt map[string]string `json:"routeByExt,omitempty"` // v3.14+: extension→dir routing (Download_*, Unzip_*)
- DefaultDir string `json:"defaultDir,omitempty"` // v3.14+: fallback dir when routeByExt has no match
- Overwrite *bool `json:"overwrite,omitempty"` // v3.14+: overwrite flag for Unzip_* (default true)
- Children []string `json:"children,omitempty"` // Parallel child branch entry points
- Next string `json:"next,omitempty"` // Serial successor node
- OnError string `json:"onError,omitempty"` // v3.10+: error handler step ID (jump here instead of failing)
- Cases [][]string `json:"cases,omitempty"` // Conditional branches (for Branch_*) - [expression, stepId]
- Mode string `json:"mode,omitempty"` // Execution mode (for Loop_*, Write_*)
- Print string `json:"print,omitempty"` // v3.13+: expression emitted as step_print RunEvent after successful execution
- Reason string `json:"reason,omitempty"` // v3.15+: Pause_* display reason for frontend/notifications
- ResumeResultTarget string `json:"resumeResultTarget,omitempty"` // v3.15+: Pause_* target $vars path for resume payload (required for Pause_*)
- Timeout *PauseTimeout `json:"timeout,omitempty"` // v3.15+: Pause_* optional timeout configuration
- While string `json:"while,omitempty"` // v3.16+: Loop_* while-expression (mutually exclusive with source)
- MaxIterations *int `json:"maxIterations,omitempty"` // v3.16+: Loop_* iteration cap (required with while, optional with source)
- Model string `json:"model,omitempty"` // v3.16+: LLM_* provider/model selector (e.g. "openai", "openai/gpt-4.1")
- Meta Metadata `json:"meta,omitempty"` // Metadata for display/debugging
- }
- // RunParams carries run-level execution control parameters (v3.15+, spec §2.2).
- // These are distinct from the workflow's registry.params (which are workflow
- // design-time parameters); RunParams control how a particular execution is
- // dispatched and scoped by the platform.
- type RunParams struct {
- Params map[string]interface{} `json:"params,omitempty"` // Business input params (declared in registry.params, spec §1.7)
- WorkspaceID string `json:"workspaceId,omitempty"` // Workspace scope for this execution
- Nodes []string `json:"nodes,omitempty"` // Subset of step IDs to selectively execute
- Mode string `json:"mode,omitempty"` // Execution mode: create / patch / regenerate / validate
- }
- // PauseTimeout configures the optional timeout for a Pause_* node (v3.15+).
- type PauseTimeout struct {
- Sec int `json:"sec"` // Timeout duration in seconds (must be > 0)
- On string `json:"on"` // Step ID to jump to on timeout (typically Stop_* or an error handler)
- }
- // DownloadSource represents the object form of a Download_* source field.
- // When source is a string, it is treated as a plain URL.
- type DownloadSource struct {
- URL string `json:"url"` // Required: download URL
- Headers map[string]string `json:"headers,omitempty"` // Optional: extra HTTP request headers
- Auth string `json:"auth,omitempty"` // Optional: auth credential reference
- Timeout int `json:"timeout,omitempty"` // Optional: timeout in seconds
- Checksum string `json:"checksum,omitempty"` // Optional: expected checksum (sha256:hex)
- }
- // StepInput represents the input parameters for a step
- type StepInput map[string]interface{}
- // StepOutput represents the output mapping from _result to global variables
- type StepOutput map[string]string
- // UnmarshalJSON supports both the full object form {"$var": "=_result"} and
- // the string shorthand "$var" (equivalent to {"$var": "=_result"}).
- func (s *StepOutput) UnmarshalJSON(data []byte) error {
- // Try object form first
- var m map[string]string
- if err := json.Unmarshal(data, &m); err == nil {
- *s = StepOutput(m)
- return nil
- }
- // Try string shorthand: "$var" → {"$var": "=_result"}
- var str string
- if err := json.Unmarshal(data, &str); err == nil {
- *s = StepOutput{str: "=_result"}
- return nil
- }
- return fmt.Errorf("out field must be a string or object, got: %s", string(data))
- }
- // Metadata contains display/debugging information
- type Metadata map[string]interface{}
- // ExecutionContext holds the runtime state during workflow execution
- type ExecutionContext struct {
- Ctx context.Context // Go context for cancellation
- WorkflowID string // Workflow execution instance ID
- Params map[string]interface{} // Input parameters (read-only)
- ParamTypes map[string]string // Parameter type declarations from registry
- Variables map[string]interface{} // Global variables ($vars)
- VarTypes map[string]string // Variable type declarations from registry
- SystemVars map[string]interface{} // System variables (SYSVAR.xxx)
- LocalVars map[string]interface{} // Local variables (_item, _index, _result)
- Artifacts map[string]string // Temporary file references
- CurrentStepID string // Current executing step
- Status ExecutionStatus // Current execution status
- StartTime time.Time // Execution start time
- RunEventStream chan RunEvent // v3.12+: structured run_events stream (spec Chapter 13)
- ServiceAdapter ServiceAdapter // Adapter for calling services
- APIAdapter APIAdapter // Adapter for calling third-party APIs
- ComponentAdapter ComponentAdapter // Adapter for calling components
- LLMAdapter LLMAdapter // Adapter for calling LLMs
- LLMAdapterRegistry *LLMAdapterRegistry // v3.16+: multi-provider LLM adapter registry (nil for pre-3.16)
- FileAdapter FileAdapter // Adapter for file operations
- DocAdapter DocAdapter // Adapter for resolving semantic documents
- PauseState *PauseState // v3.15+: current pause state (non-nil only when Status == StatusPaused)
- pauseMu sync.Mutex // v3.15+: protects PauseState pointer field
- RunParams *RunParams // v3.15+: optional run-level control parameters (spec §2.2)
- }
- // ExecutionStatus represents the current status of workflow execution
- type ExecutionStatus string
- const (
- StatusRunning ExecutionStatus = "running"
- StatusCompleted ExecutionStatus = "completed"
- StatusFailed ExecutionStatus = "failed"
- StatusStopped ExecutionStatus = "stopped"
- StatusPaused ExecutionStatus = "paused" // v3.15+: workflow is suspended at a Pause_* node, awaiting resume
- )
- // RunEventType represents the type of a run_events event (spec 3.12 Chapter 13)
- type RunEventType string
- const (
- RunEventWorkflowStart RunEventType = "workflow_start"
- RunEventWorkflowDone RunEventType = "workflow_done"
- RunEventWorkflowFailed RunEventType = "workflow_failed"
- RunEventWorkflowCancelled RunEventType = "workflow_cancelled"
- RunEventStepStart RunEventType = "step_start"
- RunEventStepDone RunEventType = "step_done"
- RunEventStepError RunEventType = "step_error"
- RunEventStepSkipped RunEventType = "step_skipped"
- RunEventLLMToken RunEventType = "llm_token"
- RunEventLLMDone RunEventType = "llm_done"
- RunEventFileStart RunEventType = "file_start"
- RunEventFileDone RunEventType = "file_done"
- RunEventStepPrint RunEventType = "step_print"
- RunEventPauseStart RunEventType = "pause_start" // v3.15+: workflow entered paused state at Pause_* node
- RunEventPauseResumed RunEventType = "pause_resumed" // v3.15+: resume succeeded, payload written, execution continuing
- RunEventPauseTimeout RunEventType = "pause_timeout" // v3.15+: pause timed out, jumping to timeout.on
- RunEventPauseRejected RunEventType = "pause_rejected" // v3.15+: resume request rejected (invalid token, wrong state, etc.)
- )
- // RunEvent is a structured event emitted to the run_events stream (spec 3.12 Chapter 13).
- // Each event is independently parseable and carries a monotonic sequence number.
- type RunEvent struct {
- RunID string `json:"run_id"` // Workflow run instance ID
- Seq uint64 `json:"seq"` // Monotonically increasing sequence number (from 1)
- Ts string `json:"ts"` // ISO 8601 timestamp, millisecond precision
- Type RunEventType `json:"type"` // Event type (see RunEventType constants)
- StepID *string `json:"step_id"` // Step ID, or null for workflow-level events
- Payload map[string]interface{} `json:"payload"` // Event-specific payload (see spec 13.4)
- }
- // ServiceResult represents the result from a service call
- type ServiceResult struct {
- Data map[string]interface{} `json:"data"` // Business result
- }
- // StepType represents the type of workflow step
- type StepType string
- const (
- StepTypeService StepType = "Service"
- StepTypeAPI StepType = "API"
- StepTypeComponent StepType = "Component"
- StepTypeLLM StepType = "LLM"
- StepTypeSet StepType = "Set"
- StepTypeWrite StepType = "Write"
- StepTypeDownload StepType = "Download" // v3.14+: stream-download a file from an external URL
- StepTypeUnzip StepType = "Unzip" // v3.14+: extract a zip archive and route entries by extension
- StepTypePause StepType = "Pause" // v3.15+: suspend workflow execution pending external resume
- StepTypeBranch StepType = "Branch"
- StepTypeLoop StepType = "Loop"
- StepTypeStop StepType = "Stop"
- StepTypeNoop StepType = "Noop" // Does nothing, useful for grouping children
- )
- // WriteMode represents the file write strategy
- type WriteMode string
- const (
- WriteModeOverwrite WriteMode = "overwrite"
- WriteModeFailIfExists WriteMode = "failIfExists"
- WriteModeAppend WriteMode = "append"
- WriteModePrepend WriteMode = "prepend" // v3.14+: prepend content to the beginning of the file
- )
- // LoopMode represents the loop execution strategy
- type LoopMode string
- const (
- LoopModeParallel LoopMode = "parallel"
- LoopModeSerial LoopMode = "serial"
- )
- // GetSystemVar retrieves a system variable by name
- func (ctx *ExecutionContext) GetSystemVar(name string) (interface{}, bool) {
- val, ok := ctx.SystemVars[name]
- return val, ok
- }
- // SetSystemVar sets a system variable by name
- func (ctx *ExecutionContext) SetSystemVar(name string, value interface{}) {
- if ctx.SystemVars == nil {
- ctx.SystemVars = make(map[string]interface{})
- }
- ctx.SystemVars[name] = value
- }
- // GetParam retrieves a parameter (non-thread-safe, for backward compatibility)
- func (ctx *ExecutionContext) GetParam(key string) (interface{}, bool) {
- val, ok := ctx.Params[key]
- return val, ok
- }
- // GetVariable retrieves a global variable (non-thread-safe, for backward compatibility)
- func (ctx *ExecutionContext) GetVariable(key string) (interface{}, bool) {
- val, ok := ctx.Variables[key]
- return val, ok
- }
- // SetVariable sets a global variable (non-thread-safe, for backward compatibility)
- func (ctx *ExecutionContext) SetVariable(key string, value interface{}) {
- if ctx.Variables == nil {
- ctx.Variables = make(map[string]interface{})
- }
- ctx.Variables[key] = value
- }
- // GetLocalVar retrieves a local variable (non-thread-safe, for backward compatibility)
- func (ctx *ExecutionContext) GetLocalVar(key string) (interface{}, bool) {
- val, ok := ctx.LocalVars[key]
- return val, ok
- }
- // SetLocalVar sets a local variable (non-thread-safe, for backward compatibility)
- func (ctx *ExecutionContext) SetLocalVar(key string, value interface{}) {
- if ctx.LocalVars == nil {
- ctx.LocalVars = make(map[string]interface{})
- }
- ctx.LocalVars[key] = value
- }
- // DeleteLocalVar deletes a local variable (non-thread-safe, for backward compatibility)
- func (ctx *ExecutionContext) DeleteLocalVar(key string) {
- delete(ctx.LocalVars, key)
- }
- // GetArtifact retrieves an artifact (non-thread-safe, for backward compatibility)
- func (ctx *ExecutionContext) GetArtifact(key string) (string, bool) {
- val, ok := ctx.Artifacts[key]
- return val, ok
- }
- // SetArtifact sets an artifact (non-thread-safe, for backward compatibility)
- func (ctx *ExecutionContext) SetArtifact(key string, value string) {
- if ctx.Artifacts == nil {
- ctx.Artifacts = make(map[string]string)
- }
- ctx.Artifacts[key] = value
- }
- // SetArrayIndex sets an array index (non-thread-safe, for backward compatibility)
- func (ctx *ExecutionContext) SetArrayIndex(key string, index int, value interface{}) {
- root, ok := ctx.Variables[key]
- if !ok {
- slice := make([]interface{}, index+1)
- slice[index] = value
- ctx.Variables[key] = slice
- return
- }
- slice, ok := root.([]interface{})
- if !ok {
- slice = make([]interface{}, index+1)
- slice[index] = value
- ctx.Variables[key] = slice
- return
- }
- for len(slice) <= index {
- slice = append(slice, nil)
- }
- slice[index] = value
- ctx.Variables[key] = slice
- }
- // GetBaseContext returns itself for ExecutionContext
- func (ctx *ExecutionContext) GetBaseContext() *ExecutionContext {
- return ctx
- }
- // SetCurrentStepID sets the current step ID (non-thread-safe)
- func (ctx *ExecutionContext) SetCurrentStepID(stepID string) {
- ctx.CurrentStepID = stepID
- }
- // SetStatus sets the execution status (non-thread-safe)
- func (ctx *ExecutionContext) SetStatus(status ExecutionStatus) {
- ctx.Status = status
- }
- // IsStopped returns true if the execution status is stopped
- func (ctx *ExecutionContext) IsStopped() bool {
- return ctx.Status == StatusStopped
- }
- // SafeExecutionContext wraps ExecutionContext with thread-safe access
- type SafeExecutionContext struct {
- *ExecutionContext
- varMutex sync.RWMutex // Protects Variables map
- localMutex sync.Mutex // Protects LocalVars map
- artifactMutex sync.Mutex // Protects Artifacts map
- sysMutex sync.RWMutex // Protects SystemVars map
- statusMutex sync.Mutex // Protects Status, CurrentStepID
- }
- // NewSafeExecutionContext creates a thread-safe wrapper around ExecutionContext
- func NewSafeExecutionContext(ctx *ExecutionContext) *SafeExecutionContext {
- return &SafeExecutionContext{
- ExecutionContext: ctx,
- }
- }
- // SwapLocalVars temporarily swaps LocalVars with proper locking
- // Returns a cleanup function that must be called to restore original LocalVars
- func (s *SafeExecutionContext) SwapLocalVars(newLocalVars map[string]interface{}) func() {
- s.localMutex.Lock()
- oldLocalVars := s.LocalVars
- s.LocalVars = newLocalVars
- s.localMutex.Unlock()
- // Return cleanup function
- return func() {
- s.localMutex.Lock()
- s.LocalVars = oldLocalVars
- s.localMutex.Unlock()
- }
- }
- // GetParam retrieves a parameter with read lock
- func (s *SafeExecutionContext) GetParam(key string) (interface{}, bool) {
- s.varMutex.RLock()
- defer s.varMutex.RUnlock()
- val, ok := s.Params[key]
- return val, ok
- }
- // GetVariable retrieves a global variable with read lock
- func (s *SafeExecutionContext) GetVariable(key string) (interface{}, bool) {
- s.varMutex.RLock()
- defer s.varMutex.RUnlock()
- val, ok := s.Variables[key]
- return val, ok
- }
- // SetVariable sets a global variable with write lock
- func (s *SafeExecutionContext) SetVariable(key string, value interface{}) {
- s.varMutex.Lock()
- defer s.varMutex.Unlock()
- if s.Variables == nil {
- s.Variables = make(map[string]interface{})
- }
- s.Variables[key] = value
- }
- // GetLocalVar retrieves a local variable with lock
- func (s *SafeExecutionContext) GetLocalVar(key string) (interface{}, bool) {
- s.localMutex.Lock()
- defer s.localMutex.Unlock()
- val, ok := s.LocalVars[key]
- return val, ok
- }
- // SetLocalVar sets a local variable with lock
- func (s *SafeExecutionContext) SetLocalVar(key string, value interface{}) {
- s.localMutex.Lock()
- defer s.localMutex.Unlock()
- if s.LocalVars == nil {
- s.LocalVars = make(map[string]interface{})
- }
- s.LocalVars[key] = value
- }
- // DeleteLocalVar deletes a local variable with lock
- func (s *SafeExecutionContext) DeleteLocalVar(key string) {
- s.localMutex.Lock()
- defer s.localMutex.Unlock()
- delete(s.LocalVars, key)
- }
- // GetArtifact retrieves an artifact reference with lock
- func (s *SafeExecutionContext) GetArtifact(key string) (string, bool) {
- s.artifactMutex.Lock()
- defer s.artifactMutex.Unlock()
- val, ok := s.Artifacts[key]
- return val, ok
- }
- // SetArtifact sets an artifact reference with lock
- func (s *SafeExecutionContext) SetArtifact(key string, value string) {
- s.artifactMutex.Lock()
- defer s.artifactMutex.Unlock()
- if s.Artifacts == nil {
- s.Artifacts = make(map[string]string)
- }
- s.Artifacts[key] = value
- }
- // GetStatus retrieves the execution status with lock
- func (s *SafeExecutionContext) GetStatus() ExecutionStatus {
- s.statusMutex.Lock()
- defer s.statusMutex.Unlock()
- return s.Status
- }
- // SetStatus sets the execution status with lock
- func (s *SafeExecutionContext) SetStatus(status ExecutionStatus) {
- s.statusMutex.Lock()
- defer s.statusMutex.Unlock()
- s.Status = status
- }
- // IsStopped returns true if the execution status is stopped (thread-safe)
- func (s *SafeExecutionContext) IsStopped() bool {
- s.statusMutex.Lock()
- defer s.statusMutex.Unlock()
- return s.Status == StatusStopped
- }
- // GetCurrentStepID retrieves the current step ID with lock
- func (s *SafeExecutionContext) GetCurrentStepID() string {
- s.statusMutex.Lock()
- defer s.statusMutex.Unlock()
- return s.CurrentStepID
- }
- // SetCurrentStepID sets the current step ID with lock
- func (s *SafeExecutionContext) SetCurrentStepID(id string) {
- s.statusMutex.Lock()
- defer s.statusMutex.Unlock()
- s.CurrentStepID = id
- }
- // GetSystemVar retrieves a system variable with read lock
- func (s *SafeExecutionContext) GetSystemVar(name string) (interface{}, bool) {
- s.sysMutex.RLock()
- defer s.sysMutex.RUnlock()
- val, ok := s.SystemVars[name]
- return val, ok
- }
- // SetSystemVar sets a system variable with write lock
- func (s *SafeExecutionContext) SetSystemVar(name string, value interface{}) {
- s.sysMutex.Lock()
- defer s.sysMutex.Unlock()
- if s.SystemVars == nil {
- s.SystemVars = make(map[string]interface{})
- }
- s.SystemVars[name] = value
- }
- // SetArrayIndex atomically sets an array index with auto-grow
- func (s *SafeExecutionContext) SetArrayIndex(key string, index int, value interface{}) {
- s.varMutex.Lock()
- defer s.varMutex.Unlock()
- root, ok := s.Variables[key]
- if !ok {
- // Create new array
- slice := make([]interface{}, index+1)
- slice[index] = value
- s.Variables[key] = slice
- return
- }
- slice, ok := root.([]interface{})
- if !ok {
- // Not an array, create new array
- slice = make([]interface{}, index+1)
- slice[index] = value
- s.Variables[key] = slice
- return
- }
- // Grow slice if necessary
- for len(slice) <= index {
- slice = append(slice, nil)
- }
- slice[index] = value
- s.Variables[key] = slice
- }
- // GetBaseContext returns the underlying ExecutionContext
- func (s *SafeExecutionContext) GetBaseContext() *ExecutionContext {
- return s.ExecutionContext
- }
- // ChildExecutionContext provides isolated local variables for parallel branches
- type ChildExecutionContext struct {
- Parent *SafeExecutionContext
- LocalVars map[string]interface{} // Isolated local scope
- localStopped bool // true after this branch hits Stop_*
- }
- // NewChildExecutionContext creates a child context with isolated local variables
- func NewChildExecutionContext(parent *SafeExecutionContext) *ChildExecutionContext {
- return &ChildExecutionContext{
- Parent: parent,
- LocalVars: make(map[string]interface{}),
- }
- }
- // GetParam delegates to parent (shared params)
- func (c *ChildExecutionContext) GetParam(key string) (interface{}, bool) {
- return c.Parent.GetParam(key)
- }
- // GetVariable delegates to parent (shared globals)
- func (c *ChildExecutionContext) GetVariable(key string) (interface{}, bool) {
- return c.Parent.GetVariable(key)
- }
- // SetVariable delegates to parent (shared globals)
- func (c *ChildExecutionContext) SetVariable(key string, value interface{}) {
- c.Parent.SetVariable(key, value)
- }
- // GetLocalVar uses isolated local scope (no locking needed, single goroutine)
- func (c *ChildExecutionContext) GetLocalVar(key string) (interface{}, bool) {
- val, ok := c.LocalVars[key]
- return val, ok
- }
- // SetLocalVar uses isolated local scope (no locking needed, single goroutine)
- func (c *ChildExecutionContext) SetLocalVar(key string, value interface{}) {
- c.LocalVars[key] = value
- }
- // DeleteLocalVar deletes from isolated local scope
- func (c *ChildExecutionContext) DeleteLocalVar(key string) {
- delete(c.LocalVars, key)
- }
- // GetArtifact delegates to parent (shared artifacts)
- func (c *ChildExecutionContext) GetArtifact(key string) (string, bool) {
- return c.Parent.GetArtifact(key)
- }
- // SetArtifact delegates to parent (shared artifacts)
- func (c *ChildExecutionContext) SetArtifact(key string, value string) {
- c.Parent.SetArtifact(key, value)
- }
- // GetSystemVar delegates to parent (shared system vars)
- func (c *ChildExecutionContext) GetSystemVar(name string) (interface{}, bool) {
- return c.Parent.GetSystemVar(name)
- }
- // SetSystemVar delegates to parent (shared system vars)
- func (c *ChildExecutionContext) SetSystemVar(name string, value interface{}) {
- c.Parent.SetSystemVar(name, value)
- }
- // SetArrayIndex delegates to parent (shared global arrays)
- func (c *ChildExecutionContext) SetArrayIndex(key string, index int, value interface{}) {
- c.Parent.SetArrayIndex(key, index, value)
- }
- // GetBaseContext returns the parent's underlying ExecutionContext
- func (c *ChildExecutionContext) GetBaseContext() *ExecutionContext {
- return c.Parent.ExecutionContext
- }
- // SetCurrentStepID delegates to parent (thread-safe)
- func (c *ChildExecutionContext) SetCurrentStepID(stepID string) {
- c.Parent.SetCurrentStepID(stepID)
- }
- // SetStatus delegates to parent (thread-safe) and tracks local stopped flag
- func (c *ChildExecutionContext) SetStatus(status ExecutionStatus) {
- if status == StatusStopped {
- c.localStopped = true
- }
- c.Parent.SetStatus(status)
- }
- // IsStopped returns true if THIS branch hit Stop_* (not sibling branches)
- func (c *ChildExecutionContext) IsStopped() bool {
- return c.localStopped
- }
- // ContextAccessor provides an interface for accessing execution context
- type ContextAccessor interface {
- GetParam(key string) (interface{}, bool)
- GetVariable(key string) (interface{}, bool)
- SetVariable(key string, value interface{})
- SetArrayIndex(key string, index int, value interface{})
- GetLocalVar(key string) (interface{}, bool)
- SetLocalVar(key string, value interface{})
- DeleteLocalVar(key string)
- GetArtifact(key string) (string, bool)
- SetArtifact(key string, value string)
- GetSystemVar(name string) (interface{}, bool)
- SetSystemVar(name string, value interface{})
- GetBaseContext() *ExecutionContext
- // Thread-safe status field access
- SetCurrentStepID(stepID string)
- SetStatus(status ExecutionStatus)
- IsStopped() bool // Branch-aware stopped check (child contexts check local flag, not global)
- }
- // resumeSignal carries the payload and request ID sent by a Resume call to unblock executePauseStep.
- type resumeSignal struct {
- Payload interface{} // The payload provided by the caller; written to resumeResultTarget
- RequestID string // Caller-supplied idempotency key
- }
- // PauseState holds the runtime state of an active Pause_* node (v3.15+).
- // It is created by executePauseStep and stored in ExecutionContext.PauseState
- // for the duration of the pause. The Resume method uses it to send a signal.
- type PauseState struct {
- mu sync.Mutex // protects all fields below
- ch chan resumeSignal // buffered(1): executePauseStep reads; Resume writes
- token string // expected token for resume validation
- nodeID string // Pause_* step ID (for pause_rejected events)
- seenRequestIDs map[string]bool // idempotency: request IDs already processed
- resumed bool // true once a valid resume signal has been sent
- }
- // WaitToken returns the SHA-256 hex token that was published in the pause_start event.
- // This is safe to call from any goroutine while the workflow is in StatusPaused.
- func (p *PauseState) WaitToken() string {
- p.mu.Lock()
- defer p.mu.Unlock()
- return p.token
- }
- // ResumeRequest carries the caller-supplied fields for the Resume method (v3.15+).
- // Field names correspond to the protocol-level resume request body defined in spec §11.4.1.
- type ResumeRequest struct {
- RunID string // Optional: workflow run ID for explicit validation (spec §11.4.1 "runId"); if non-empty, must match execCtx.WorkflowID
- Token string // Must match the wait token from the pause_start event
- Payload interface{} // Arbitrary data written to resumeResultTarget in $vars
- RequestID string // Optional idempotency key; duplicate RequestIDs are silently no-ops
- }
- // ErrBreak is a sentinel error returned when a step's next is "BREAK".
- // The enclosing Loop_* executor catches this to exit the loop immediately.
- // It must never propagate past the loop handler.
- var ErrBreak = errors.New("BREAK")
- // IsBreakError checks if an error is the BREAK sentinel.
- func IsBreakError(err error) bool {
- return errors.Is(err, ErrBreak)
- }
- // LLMError represents a structured LLM call failure (v3.10+)
- type LLMError struct {
- Type string // e.g., "rate_limit", "auth_error", "timeout"
- Code string // e.g., "RATE_LIMIT"
- Message string // Human-readable error message
- Retryable bool // Whether the call can be retried
- StatusCode int // HTTP status code
- Provider string // LLM provider name
- Model string // Model used
- RequestID string // Request ID from provider
- ProviderError string // provider exception class
- Details map[string]interface{} // Additional details
- Raw map[string]interface{} // Raw error response
- }
- // Error implements the error interface
- func (e *LLMError) Error() string {
- return fmt.Sprintf("%s: %s (code=%s, status=%d)", e.Type, e.Message, e.Code, e.StatusCode)
- }
- // ToMap converts LLMError to a map for use as _error local variable
- func (e *LLMError) ToMap() map[string]interface{} {
- m := map[string]interface{}{
- "type": e.Type,
- "code": e.Code,
- "message": e.Message,
- "retryable": e.Retryable,
- }
- if e.StatusCode != 0 {
- m["status_code"] = e.StatusCode
- }
- if e.Provider != "" {
- m["provider"] = e.Provider
- }
- if e.Model != "" {
- m["model"] = e.Model
- }
- if e.RequestID != "" {
- m["request_id"] = e.RequestID
- }
- if e.ProviderError != "" {
- m["provider_error"] = e.ProviderError
- }
- if e.Details != nil {
- m["details"] = e.Details
- }
- if e.Raw != nil {
- m["raw"] = e.Raw
- }
- return m
- }
|