// Package workflow implements a v3.x workflow orchestration engine (current baseline: v3.16) // that supports multi-step output, API calls, structured output (v3.7+), output_config passthrough (v3.7+), // schema reuse with schemaRef (v3.9+), unified LLM output semantics with _result/_meta/_error (v3.10+), // stream as top-level LLM_* node attribute (v3.12+), run_events structured event stream (v3.12+), // file_start/file_done run_events for file write lifecycle tracking (v3.13+), // Download_*/Unzip_* nodes, .tmp/ path isolation, _iterDir, Write prepend (v3.14+), // Pause_* nodes with resume protocol for explicit workflow suspension (v3.15+), // Loop_* while-mode with maxIterations, BREAK keyword, and multi-provider LLM with model field (v3.16+). package workflow import ( "context" "encoding/json" "errors" "fmt" "sync" "time" ) // WorkflowType represents the type/location of a workflow type WorkflowType string const ( WorkflowTypeIDE WorkflowType = "IDE" // IDE agent flow (Process/) - no Service_* nodes allowed WorkflowTypeBusiness WorkflowType = "Business" // Business workflow (Workflows/) - can call services WorkflowTypeApproval WorkflowType = "Approval" // Approval workflow (Workflows/) - can call services WorkflowTypeLocal WorkflowType = "Local" // Local workflow (LocalWorkflows/) - developer self-use ) // Workflow represents the top-level workflow structure. type Workflow struct { Version string `json:"version"` // Current baseline: "3.13" (older v3.x versions remain compatibility-supported) Name string `json:"name"` // Workflow name Registry Registry `json:"registry"` // External resources and global boundaries Steps []Step `json:"steps"` // Node list (workflow body) WorkflowType WorkflowType `json:"workflowType,omitempty"` // Optional: IDE, Business, Approval, Local } // Step represents a workflow step/node with common properties type Step struct { ID string `json:"id"` // Unique identifier (with type prefix like Service_xxx, LLM_xxx) If string `json:"if,omitempty"` // Execution condition expression In StepInput `json:"in,omitempty"` // Input parameters Out StepOutput `json:"out,omitempty"` // Output mapping to global vars and files Target string `json:"target,omitempty"` // Write target (for Set_*, Write_*, Download_*) Value string `json:"value,omitempty"` // Write value expression (for Set_*, Write_*) Source interface{} `json:"source,omitempty"` // Data source: expression string (Loop_*), URL/object (Download_*), zip path (Unzip_*) RouteByExt map[string]string `json:"routeByExt,omitempty"` // v3.14+: extension→dir routing (Download_*, Unzip_*) DefaultDir string `json:"defaultDir,omitempty"` // v3.14+: fallback dir when routeByExt has no match Overwrite *bool `json:"overwrite,omitempty"` // v3.14+: overwrite flag for Unzip_* (default true) Children []string `json:"children,omitempty"` // Parallel child branch entry points Next string `json:"next,omitempty"` // Serial successor node OnError string `json:"onError,omitempty"` // v3.10+: error handler step ID (jump here instead of failing) Cases [][]string `json:"cases,omitempty"` // Conditional branches (for Branch_*) - [expression, stepId] Mode string `json:"mode,omitempty"` // Execution mode (for Loop_*, Write_*) Print string `json:"print,omitempty"` // v3.13+: expression emitted as step_print RunEvent after successful execution Reason string `json:"reason,omitempty"` // v3.15+: Pause_* display reason for frontend/notifications ResumeResultTarget string `json:"resumeResultTarget,omitempty"` // v3.15+: Pause_* target $vars path for resume payload (required for Pause_*) Timeout *PauseTimeout `json:"timeout,omitempty"` // v3.15+: Pause_* optional timeout configuration While string `json:"while,omitempty"` // v3.16+: Loop_* while-expression (mutually exclusive with source) MaxIterations *int `json:"maxIterations,omitempty"` // v3.16+: Loop_* iteration cap (required with while, optional with source) Model string `json:"model,omitempty"` // v3.16+: LLM_* provider/model selector (e.g. "openai", "openai/gpt-4.1") Meta Metadata `json:"meta,omitempty"` // Metadata for display/debugging } // RunParams carries run-level execution control parameters (v3.15+, spec §2.2). // These are distinct from the workflow's registry.params (which are workflow // design-time parameters); RunParams control how a particular execution is // dispatched and scoped by the platform. type RunParams struct { Params map[string]interface{} `json:"params,omitempty"` // Business input params (declared in registry.params, spec §1.7) WorkspaceID string `json:"workspaceId,omitempty"` // Workspace scope for this execution Nodes []string `json:"nodes,omitempty"` // Subset of step IDs to selectively execute Mode string `json:"mode,omitempty"` // Execution mode: create / patch / regenerate / validate } // PauseTimeout configures the optional timeout for a Pause_* node (v3.15+). type PauseTimeout struct { Sec int `json:"sec"` // Timeout duration in seconds (must be > 0) On string `json:"on"` // Step ID to jump to on timeout (typically Stop_* or an error handler) } // DownloadSource represents the object form of a Download_* source field. // When source is a string, it is treated as a plain URL. type DownloadSource struct { URL string `json:"url"` // Required: download URL Headers map[string]string `json:"headers,omitempty"` // Optional: extra HTTP request headers Auth string `json:"auth,omitempty"` // Optional: auth credential reference Timeout int `json:"timeout,omitempty"` // Optional: timeout in seconds Checksum string `json:"checksum,omitempty"` // Optional: expected checksum (sha256:hex) } // StepInput represents the input parameters for a step type StepInput map[string]interface{} // StepOutput represents the output mapping from _result to global variables type StepOutput map[string]string // UnmarshalJSON supports both the full object form {"$var": "=_result"} and // the string shorthand "$var" (equivalent to {"$var": "=_result"}). func (s *StepOutput) UnmarshalJSON(data []byte) error { // Try object form first var m map[string]string if err := json.Unmarshal(data, &m); err == nil { *s = StepOutput(m) return nil } // Try string shorthand: "$var" → {"$var": "=_result"} var str string if err := json.Unmarshal(data, &str); err == nil { *s = StepOutput{str: "=_result"} return nil } return fmt.Errorf("out field must be a string or object, got: %s", string(data)) } // Metadata contains display/debugging information type Metadata map[string]interface{} // ExecutionContext holds the runtime state during workflow execution type ExecutionContext struct { Ctx context.Context // Go context for cancellation WorkflowID string // Workflow execution instance ID Params map[string]interface{} // Input parameters (read-only) ParamTypes map[string]string // Parameter type declarations from registry Variables map[string]interface{} // Global variables ($vars) VarTypes map[string]string // Variable type declarations from registry SystemVars map[string]interface{} // System variables (SYSVAR.xxx) LocalVars map[string]interface{} // Local variables (_item, _index, _result) Artifacts map[string]string // Temporary file references CurrentStepID string // Current executing step Status ExecutionStatus // Current execution status StartTime time.Time // Execution start time RunEventStream chan RunEvent // v3.12+: structured run_events stream (spec Chapter 13) ServiceAdapter ServiceAdapter // Adapter for calling services APIAdapter APIAdapter // Adapter for calling third-party APIs ComponentAdapter ComponentAdapter // Adapter for calling components LLMAdapter LLMAdapter // Adapter for calling LLMs LLMAdapterRegistry *LLMAdapterRegistry // v3.16+: multi-provider LLM adapter registry (nil for pre-3.16) FileAdapter FileAdapter // Adapter for file operations DocAdapter DocAdapter // Adapter for resolving semantic documents PauseState *PauseState // v3.15+: current pause state (non-nil only when Status == StatusPaused) pauseMu sync.Mutex // v3.15+: protects PauseState pointer field RunParams *RunParams // v3.15+: optional run-level control parameters (spec §2.2) } // ExecutionStatus represents the current status of workflow execution type ExecutionStatus string const ( StatusRunning ExecutionStatus = "running" StatusCompleted ExecutionStatus = "completed" StatusFailed ExecutionStatus = "failed" StatusStopped ExecutionStatus = "stopped" StatusPaused ExecutionStatus = "paused" // v3.15+: workflow is suspended at a Pause_* node, awaiting resume ) // RunEventType represents the type of a run_events event (spec 3.12 Chapter 13) type RunEventType string const ( RunEventWorkflowStart RunEventType = "workflow_start" RunEventWorkflowDone RunEventType = "workflow_done" RunEventWorkflowFailed RunEventType = "workflow_failed" RunEventWorkflowCancelled RunEventType = "workflow_cancelled" RunEventStepStart RunEventType = "step_start" RunEventStepDone RunEventType = "step_done" RunEventStepError RunEventType = "step_error" RunEventStepSkipped RunEventType = "step_skipped" RunEventLLMToken RunEventType = "llm_token" RunEventLLMDone RunEventType = "llm_done" RunEventFileStart RunEventType = "file_start" RunEventFileDone RunEventType = "file_done" RunEventStepPrint RunEventType = "step_print" RunEventPauseStart RunEventType = "pause_start" // v3.15+: workflow entered paused state at Pause_* node RunEventPauseResumed RunEventType = "pause_resumed" // v3.15+: resume succeeded, payload written, execution continuing RunEventPauseTimeout RunEventType = "pause_timeout" // v3.15+: pause timed out, jumping to timeout.on RunEventPauseRejected RunEventType = "pause_rejected" // v3.15+: resume request rejected (invalid token, wrong state, etc.) ) // RunEvent is a structured event emitted to the run_events stream (spec 3.12 Chapter 13). // Each event is independently parseable and carries a monotonic sequence number. type RunEvent struct { RunID string `json:"run_id"` // Workflow run instance ID Seq uint64 `json:"seq"` // Monotonically increasing sequence number (from 1) Ts string `json:"ts"` // ISO 8601 timestamp, millisecond precision Type RunEventType `json:"type"` // Event type (see RunEventType constants) StepID *string `json:"step_id"` // Step ID, or null for workflow-level events Payload map[string]interface{} `json:"payload"` // Event-specific payload (see spec 13.4) } // ServiceResult represents the result from a service call type ServiceResult struct { Data map[string]interface{} `json:"data"` // Business result } // StepType represents the type of workflow step type StepType string const ( StepTypeService StepType = "Service" StepTypeAPI StepType = "API" StepTypeComponent StepType = "Component" StepTypeLLM StepType = "LLM" StepTypeSet StepType = "Set" StepTypeWrite StepType = "Write" StepTypeDownload StepType = "Download" // v3.14+: stream-download a file from an external URL StepTypeUnzip StepType = "Unzip" // v3.14+: extract a zip archive and route entries by extension StepTypePause StepType = "Pause" // v3.15+: suspend workflow execution pending external resume StepTypeBranch StepType = "Branch" StepTypeLoop StepType = "Loop" StepTypeStop StepType = "Stop" StepTypeNoop StepType = "Noop" // Does nothing, useful for grouping children ) // WriteMode represents the file write strategy type WriteMode string const ( WriteModeOverwrite WriteMode = "overwrite" WriteModeFailIfExists WriteMode = "failIfExists" WriteModeAppend WriteMode = "append" WriteModePrepend WriteMode = "prepend" // v3.14+: prepend content to the beginning of the file ) // LoopMode represents the loop execution strategy type LoopMode string const ( LoopModeParallel LoopMode = "parallel" LoopModeSerial LoopMode = "serial" ) // GetSystemVar retrieves a system variable by name func (ctx *ExecutionContext) GetSystemVar(name string) (interface{}, bool) { val, ok := ctx.SystemVars[name] return val, ok } // SetSystemVar sets a system variable by name func (ctx *ExecutionContext) SetSystemVar(name string, value interface{}) { if ctx.SystemVars == nil { ctx.SystemVars = make(map[string]interface{}) } ctx.SystemVars[name] = value } // GetParam retrieves a parameter (non-thread-safe, for backward compatibility) func (ctx *ExecutionContext) GetParam(key string) (interface{}, bool) { val, ok := ctx.Params[key] return val, ok } // GetVariable retrieves a global variable (non-thread-safe, for backward compatibility) func (ctx *ExecutionContext) GetVariable(key string) (interface{}, bool) { val, ok := ctx.Variables[key] return val, ok } // SetVariable sets a global variable (non-thread-safe, for backward compatibility) func (ctx *ExecutionContext) SetVariable(key string, value interface{}) { if ctx.Variables == nil { ctx.Variables = make(map[string]interface{}) } ctx.Variables[key] = value } // GetLocalVar retrieves a local variable (non-thread-safe, for backward compatibility) func (ctx *ExecutionContext) GetLocalVar(key string) (interface{}, bool) { val, ok := ctx.LocalVars[key] return val, ok } // SetLocalVar sets a local variable (non-thread-safe, for backward compatibility) func (ctx *ExecutionContext) SetLocalVar(key string, value interface{}) { if ctx.LocalVars == nil { ctx.LocalVars = make(map[string]interface{}) } ctx.LocalVars[key] = value } // DeleteLocalVar deletes a local variable (non-thread-safe, for backward compatibility) func (ctx *ExecutionContext) DeleteLocalVar(key string) { delete(ctx.LocalVars, key) } // GetArtifact retrieves an artifact (non-thread-safe, for backward compatibility) func (ctx *ExecutionContext) GetArtifact(key string) (string, bool) { val, ok := ctx.Artifacts[key] return val, ok } // SetArtifact sets an artifact (non-thread-safe, for backward compatibility) func (ctx *ExecutionContext) SetArtifact(key string, value string) { if ctx.Artifacts == nil { ctx.Artifacts = make(map[string]string) } ctx.Artifacts[key] = value } // SetArrayIndex sets an array index (non-thread-safe, for backward compatibility) func (ctx *ExecutionContext) SetArrayIndex(key string, index int, value interface{}) { root, ok := ctx.Variables[key] if !ok { slice := make([]interface{}, index+1) slice[index] = value ctx.Variables[key] = slice return } slice, ok := root.([]interface{}) if !ok { slice = make([]interface{}, index+1) slice[index] = value ctx.Variables[key] = slice return } for len(slice) <= index { slice = append(slice, nil) } slice[index] = value ctx.Variables[key] = slice } // GetBaseContext returns itself for ExecutionContext func (ctx *ExecutionContext) GetBaseContext() *ExecutionContext { return ctx } // SetCurrentStepID sets the current step ID (non-thread-safe) func (ctx *ExecutionContext) SetCurrentStepID(stepID string) { ctx.CurrentStepID = stepID } // SetStatus sets the execution status (non-thread-safe) func (ctx *ExecutionContext) SetStatus(status ExecutionStatus) { ctx.Status = status } // IsStopped returns true if the execution status is stopped func (ctx *ExecutionContext) IsStopped() bool { return ctx.Status == StatusStopped } // SafeExecutionContext wraps ExecutionContext with thread-safe access type SafeExecutionContext struct { *ExecutionContext varMutex sync.RWMutex // Protects Variables map localMutex sync.Mutex // Protects LocalVars map artifactMutex sync.Mutex // Protects Artifacts map sysMutex sync.RWMutex // Protects SystemVars map statusMutex sync.Mutex // Protects Status, CurrentStepID } // NewSafeExecutionContext creates a thread-safe wrapper around ExecutionContext func NewSafeExecutionContext(ctx *ExecutionContext) *SafeExecutionContext { return &SafeExecutionContext{ ExecutionContext: ctx, } } // SwapLocalVars temporarily swaps LocalVars with proper locking // Returns a cleanup function that must be called to restore original LocalVars func (s *SafeExecutionContext) SwapLocalVars(newLocalVars map[string]interface{}) func() { s.localMutex.Lock() oldLocalVars := s.LocalVars s.LocalVars = newLocalVars s.localMutex.Unlock() // Return cleanup function return func() { s.localMutex.Lock() s.LocalVars = oldLocalVars s.localMutex.Unlock() } } // GetParam retrieves a parameter with read lock func (s *SafeExecutionContext) GetParam(key string) (interface{}, bool) { s.varMutex.RLock() defer s.varMutex.RUnlock() val, ok := s.Params[key] return val, ok } // GetVariable retrieves a global variable with read lock func (s *SafeExecutionContext) GetVariable(key string) (interface{}, bool) { s.varMutex.RLock() defer s.varMutex.RUnlock() val, ok := s.Variables[key] return val, ok } // SetVariable sets a global variable with write lock func (s *SafeExecutionContext) SetVariable(key string, value interface{}) { s.varMutex.Lock() defer s.varMutex.Unlock() if s.Variables == nil { s.Variables = make(map[string]interface{}) } s.Variables[key] = value } // GetLocalVar retrieves a local variable with lock func (s *SafeExecutionContext) GetLocalVar(key string) (interface{}, bool) { s.localMutex.Lock() defer s.localMutex.Unlock() val, ok := s.LocalVars[key] return val, ok } // SetLocalVar sets a local variable with lock func (s *SafeExecutionContext) SetLocalVar(key string, value interface{}) { s.localMutex.Lock() defer s.localMutex.Unlock() if s.LocalVars == nil { s.LocalVars = make(map[string]interface{}) } s.LocalVars[key] = value } // DeleteLocalVar deletes a local variable with lock func (s *SafeExecutionContext) DeleteLocalVar(key string) { s.localMutex.Lock() defer s.localMutex.Unlock() delete(s.LocalVars, key) } // GetArtifact retrieves an artifact reference with lock func (s *SafeExecutionContext) GetArtifact(key string) (string, bool) { s.artifactMutex.Lock() defer s.artifactMutex.Unlock() val, ok := s.Artifacts[key] return val, ok } // SetArtifact sets an artifact reference with lock func (s *SafeExecutionContext) SetArtifact(key string, value string) { s.artifactMutex.Lock() defer s.artifactMutex.Unlock() if s.Artifacts == nil { s.Artifacts = make(map[string]string) } s.Artifacts[key] = value } // GetStatus retrieves the execution status with lock func (s *SafeExecutionContext) GetStatus() ExecutionStatus { s.statusMutex.Lock() defer s.statusMutex.Unlock() return s.Status } // SetStatus sets the execution status with lock func (s *SafeExecutionContext) SetStatus(status ExecutionStatus) { s.statusMutex.Lock() defer s.statusMutex.Unlock() s.Status = status } // IsStopped returns true if the execution status is stopped (thread-safe) func (s *SafeExecutionContext) IsStopped() bool { s.statusMutex.Lock() defer s.statusMutex.Unlock() return s.Status == StatusStopped } // GetCurrentStepID retrieves the current step ID with lock func (s *SafeExecutionContext) GetCurrentStepID() string { s.statusMutex.Lock() defer s.statusMutex.Unlock() return s.CurrentStepID } // SetCurrentStepID sets the current step ID with lock func (s *SafeExecutionContext) SetCurrentStepID(id string) { s.statusMutex.Lock() defer s.statusMutex.Unlock() s.CurrentStepID = id } // GetSystemVar retrieves a system variable with read lock func (s *SafeExecutionContext) GetSystemVar(name string) (interface{}, bool) { s.sysMutex.RLock() defer s.sysMutex.RUnlock() val, ok := s.SystemVars[name] return val, ok } // SetSystemVar sets a system variable with write lock func (s *SafeExecutionContext) SetSystemVar(name string, value interface{}) { s.sysMutex.Lock() defer s.sysMutex.Unlock() if s.SystemVars == nil { s.SystemVars = make(map[string]interface{}) } s.SystemVars[name] = value } // SetArrayIndex atomically sets an array index with auto-grow func (s *SafeExecutionContext) SetArrayIndex(key string, index int, value interface{}) { s.varMutex.Lock() defer s.varMutex.Unlock() root, ok := s.Variables[key] if !ok { // Create new array slice := make([]interface{}, index+1) slice[index] = value s.Variables[key] = slice return } slice, ok := root.([]interface{}) if !ok { // Not an array, create new array slice = make([]interface{}, index+1) slice[index] = value s.Variables[key] = slice return } // Grow slice if necessary for len(slice) <= index { slice = append(slice, nil) } slice[index] = value s.Variables[key] = slice } // GetBaseContext returns the underlying ExecutionContext func (s *SafeExecutionContext) GetBaseContext() *ExecutionContext { return s.ExecutionContext } // ChildExecutionContext provides isolated local variables for parallel branches type ChildExecutionContext struct { Parent *SafeExecutionContext LocalVars map[string]interface{} // Isolated local scope localStopped bool // true after this branch hits Stop_* } // NewChildExecutionContext creates a child context with isolated local variables func NewChildExecutionContext(parent *SafeExecutionContext) *ChildExecutionContext { return &ChildExecutionContext{ Parent: parent, LocalVars: make(map[string]interface{}), } } // GetParam delegates to parent (shared params) func (c *ChildExecutionContext) GetParam(key string) (interface{}, bool) { return c.Parent.GetParam(key) } // GetVariable delegates to parent (shared globals) func (c *ChildExecutionContext) GetVariable(key string) (interface{}, bool) { return c.Parent.GetVariable(key) } // SetVariable delegates to parent (shared globals) func (c *ChildExecutionContext) SetVariable(key string, value interface{}) { c.Parent.SetVariable(key, value) } // GetLocalVar uses isolated local scope (no locking needed, single goroutine) func (c *ChildExecutionContext) GetLocalVar(key string) (interface{}, bool) { val, ok := c.LocalVars[key] return val, ok } // SetLocalVar uses isolated local scope (no locking needed, single goroutine) func (c *ChildExecutionContext) SetLocalVar(key string, value interface{}) { c.LocalVars[key] = value } // DeleteLocalVar deletes from isolated local scope func (c *ChildExecutionContext) DeleteLocalVar(key string) { delete(c.LocalVars, key) } // GetArtifact delegates to parent (shared artifacts) func (c *ChildExecutionContext) GetArtifact(key string) (string, bool) { return c.Parent.GetArtifact(key) } // SetArtifact delegates to parent (shared artifacts) func (c *ChildExecutionContext) SetArtifact(key string, value string) { c.Parent.SetArtifact(key, value) } // GetSystemVar delegates to parent (shared system vars) func (c *ChildExecutionContext) GetSystemVar(name string) (interface{}, bool) { return c.Parent.GetSystemVar(name) } // SetSystemVar delegates to parent (shared system vars) func (c *ChildExecutionContext) SetSystemVar(name string, value interface{}) { c.Parent.SetSystemVar(name, value) } // SetArrayIndex delegates to parent (shared global arrays) func (c *ChildExecutionContext) SetArrayIndex(key string, index int, value interface{}) { c.Parent.SetArrayIndex(key, index, value) } // GetBaseContext returns the parent's underlying ExecutionContext func (c *ChildExecutionContext) GetBaseContext() *ExecutionContext { return c.Parent.ExecutionContext } // SetCurrentStepID delegates to parent (thread-safe) func (c *ChildExecutionContext) SetCurrentStepID(stepID string) { c.Parent.SetCurrentStepID(stepID) } // SetStatus delegates to parent (thread-safe) and tracks local stopped flag func (c *ChildExecutionContext) SetStatus(status ExecutionStatus) { if status == StatusStopped { c.localStopped = true } c.Parent.SetStatus(status) } // IsStopped returns true if THIS branch hit Stop_* (not sibling branches) func (c *ChildExecutionContext) IsStopped() bool { return c.localStopped } // ContextAccessor provides an interface for accessing execution context type ContextAccessor interface { GetParam(key string) (interface{}, bool) GetVariable(key string) (interface{}, bool) SetVariable(key string, value interface{}) SetArrayIndex(key string, index int, value interface{}) GetLocalVar(key string) (interface{}, bool) SetLocalVar(key string, value interface{}) DeleteLocalVar(key string) GetArtifact(key string) (string, bool) SetArtifact(key string, value string) GetSystemVar(name string) (interface{}, bool) SetSystemVar(name string, value interface{}) GetBaseContext() *ExecutionContext // Thread-safe status field access SetCurrentStepID(stepID string) SetStatus(status ExecutionStatus) IsStopped() bool // Branch-aware stopped check (child contexts check local flag, not global) } // resumeSignal carries the payload and request ID sent by a Resume call to unblock executePauseStep. type resumeSignal struct { Payload interface{} // The payload provided by the caller; written to resumeResultTarget RequestID string // Caller-supplied idempotency key } // PauseState holds the runtime state of an active Pause_* node (v3.15+). // It is created by executePauseStep and stored in ExecutionContext.PauseState // for the duration of the pause. The Resume method uses it to send a signal. type PauseState struct { mu sync.Mutex // protects all fields below ch chan resumeSignal // buffered(1): executePauseStep reads; Resume writes token string // expected token for resume validation nodeID string // Pause_* step ID (for pause_rejected events) seenRequestIDs map[string]bool // idempotency: request IDs already processed resumed bool // true once a valid resume signal has been sent } // WaitToken returns the SHA-256 hex token that was published in the pause_start event. // This is safe to call from any goroutine while the workflow is in StatusPaused. func (p *PauseState) WaitToken() string { p.mu.Lock() defer p.mu.Unlock() return p.token } // ResumeRequest carries the caller-supplied fields for the Resume method (v3.15+). // Field names correspond to the protocol-level resume request body defined in spec §11.4.1. type ResumeRequest struct { RunID string // Optional: workflow run ID for explicit validation (spec §11.4.1 "runId"); if non-empty, must match execCtx.WorkflowID Token string // Must match the wait token from the pause_start event Payload interface{} // Arbitrary data written to resumeResultTarget in $vars RequestID string // Optional idempotency key; duplicate RequestIDs are silently no-ops } // ErrBreak is a sentinel error returned when a step's next is "BREAK". // The enclosing Loop_* executor catches this to exit the loop immediately. // It must never propagate past the loop handler. var ErrBreak = errors.New("BREAK") // IsBreakError checks if an error is the BREAK sentinel. func IsBreakError(err error) bool { return errors.Is(err, ErrBreak) } // LLMError represents a structured LLM call failure (v3.10+) type LLMError struct { Type string // e.g., "rate_limit", "auth_error", "timeout" Code string // e.g., "RATE_LIMIT" Message string // Human-readable error message Retryable bool // Whether the call can be retried StatusCode int // HTTP status code Provider string // LLM provider name Model string // Model used RequestID string // Request ID from provider ProviderError string // provider exception class Details map[string]interface{} // Additional details Raw map[string]interface{} // Raw error response } // Error implements the error interface func (e *LLMError) Error() string { return fmt.Sprintf("%s: %s (code=%s, status=%d)", e.Type, e.Message, e.Code, e.StatusCode) } // ToMap converts LLMError to a map for use as _error local variable func (e *LLMError) ToMap() map[string]interface{} { m := map[string]interface{}{ "type": e.Type, "code": e.Code, "message": e.Message, "retryable": e.Retryable, } if e.StatusCode != 0 { m["status_code"] = e.StatusCode } if e.Provider != "" { m["provider"] = e.Provider } if e.Model != "" { m["model"] = e.Model } if e.RequestID != "" { m["request_id"] = e.RequestID } if e.ProviderError != "" { m["provider_error"] = e.ProviderError } if e.Details != nil { m["details"] = e.Details } if e.Raw != nil { m["raw"] = e.Raw } return m }