types.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747
  1. // Package workflow implements a v3.x workflow orchestration engine (current baseline: v3.16)
  2. // that supports multi-step output, API calls, structured output (v3.7+), output_config passthrough (v3.7+),
  3. // schema reuse with schemaRef (v3.9+), unified LLM output semantics with _result/_meta/_error (v3.10+),
  4. // stream as top-level LLM_* node attribute (v3.12+), run_events structured event stream (v3.12+),
  5. // file_start/file_done run_events for file write lifecycle tracking (v3.13+),
  6. // Download_*/Unzip_* nodes, .tmp/ path isolation, _iterDir, Write prepend (v3.14+),
  7. // Pause_* nodes with resume protocol for explicit workflow suspension (v3.15+),
  8. // Loop_* while-mode with maxIterations, BREAK keyword, and multi-provider LLM with model field (v3.16+).
  9. package workflow
  10. import (
  11. "context"
  12. "encoding/json"
  13. "errors"
  14. "fmt"
  15. "sync"
  16. "time"
  17. )
  18. // WorkflowType represents the type/location of a workflow
  19. type WorkflowType string
  20. const (
  21. WorkflowTypeIDE WorkflowType = "IDE" // IDE agent flow (Process/) - no Service_* nodes allowed
  22. WorkflowTypeBusiness WorkflowType = "Business" // Business workflow (Workflows/) - can call services
  23. WorkflowTypeApproval WorkflowType = "Approval" // Approval workflow (Workflows/) - can call services
  24. WorkflowTypeLocal WorkflowType = "Local" // Local workflow (LocalWorkflows/) - developer self-use
  25. )
  26. // Workflow represents the top-level workflow structure.
  27. type Workflow struct {
  28. Version string `json:"version"` // Current baseline: "3.13" (older v3.x versions remain compatibility-supported)
  29. Name string `json:"name"` // Workflow name
  30. Registry Registry `json:"registry"` // External resources and global boundaries
  31. Steps []Step `json:"steps"` // Node list (workflow body)
  32. WorkflowType WorkflowType `json:"workflowType,omitempty"` // Optional: IDE, Business, Approval, Local
  33. }
  34. // Step represents a workflow step/node with common properties
  35. type Step struct {
  36. ID string `json:"id"` // Unique identifier (with type prefix like Service_xxx, LLM_xxx)
  37. If string `json:"if,omitempty"` // Execution condition expression
  38. In StepInput `json:"in,omitempty"` // Input parameters
  39. Out StepOutput `json:"out,omitempty"` // Output mapping to global vars and files
  40. Target string `json:"target,omitempty"` // Write target (for Set_*, Write_*, Download_*)
  41. Value string `json:"value,omitempty"` // Write value expression (for Set_*, Write_*)
  42. Source interface{} `json:"source,omitempty"` // Data source: expression string (Loop_*), URL/object (Download_*), zip path (Unzip_*)
  43. RouteByExt map[string]string `json:"routeByExt,omitempty"` // v3.14+: extension→dir routing (Download_*, Unzip_*)
  44. DefaultDir string `json:"defaultDir,omitempty"` // v3.14+: fallback dir when routeByExt has no match
  45. Overwrite *bool `json:"overwrite,omitempty"` // v3.14+: overwrite flag for Unzip_* (default true)
  46. Children []string `json:"children,omitempty"` // Parallel child branch entry points
  47. Next string `json:"next,omitempty"` // Serial successor node
  48. OnError string `json:"onError,omitempty"` // v3.10+: error handler step ID (jump here instead of failing)
  49. Cases [][]string `json:"cases,omitempty"` // Conditional branches (for Branch_*) - [expression, stepId]
  50. Mode string `json:"mode,omitempty"` // Execution mode (for Loop_*, Write_*)
  51. Print string `json:"print,omitempty"` // v3.13+: expression emitted as step_print RunEvent after successful execution
  52. Reason string `json:"reason,omitempty"` // v3.15+: Pause_* display reason for frontend/notifications
  53. ResumeResultTarget string `json:"resumeResultTarget,omitempty"` // v3.15+: Pause_* target $vars path for resume payload (required for Pause_*)
  54. Timeout *PauseTimeout `json:"timeout,omitempty"` // v3.15+: Pause_* optional timeout configuration
  55. While string `json:"while,omitempty"` // v3.16+: Loop_* while-expression (mutually exclusive with source)
  56. MaxIterations *int `json:"maxIterations,omitempty"` // v3.16+: Loop_* iteration cap (required with while, optional with source)
  57. Model string `json:"model,omitempty"` // v3.16+: LLM_* provider/model selector (e.g. "openai", "openai/gpt-4.1")
  58. Meta Metadata `json:"meta,omitempty"` // Metadata for display/debugging
  59. }
  60. // RunParams carries run-level execution control parameters (v3.15+, spec §2.2).
  61. // These are distinct from the workflow's registry.params (which are workflow
  62. // design-time parameters); RunParams control how a particular execution is
  63. // dispatched and scoped by the platform.
  64. type RunParams struct {
  65. Params map[string]interface{} `json:"params,omitempty"` // Business input params (declared in registry.params, spec §1.7)
  66. WorkspaceID string `json:"workspaceId,omitempty"` // Workspace scope for this execution
  67. Nodes []string `json:"nodes,omitempty"` // Subset of step IDs to selectively execute
  68. Mode string `json:"mode,omitempty"` // Execution mode: create / patch / regenerate / validate
  69. }
  70. // PauseTimeout configures the optional timeout for a Pause_* node (v3.15+).
  71. type PauseTimeout struct {
  72. Sec int `json:"sec"` // Timeout duration in seconds (must be > 0)
  73. On string `json:"on"` // Step ID to jump to on timeout (typically Stop_* or an error handler)
  74. }
  75. // DownloadSource represents the object form of a Download_* source field.
  76. // When source is a string, it is treated as a plain URL.
  77. type DownloadSource struct {
  78. URL string `json:"url"` // Required: download URL
  79. Headers map[string]string `json:"headers,omitempty"` // Optional: extra HTTP request headers
  80. Auth string `json:"auth,omitempty"` // Optional: auth credential reference
  81. Timeout int `json:"timeout,omitempty"` // Optional: timeout in seconds
  82. Checksum string `json:"checksum,omitempty"` // Optional: expected checksum (sha256:hex)
  83. }
  84. // StepInput represents the input parameters for a step
  85. type StepInput map[string]interface{}
  86. // StepOutput represents the output mapping from _result to global variables
  87. type StepOutput map[string]string
  88. // UnmarshalJSON supports both the full object form {"$var": "=_result"} and
  89. // the string shorthand "$var" (equivalent to {"$var": "=_result"}).
  90. func (s *StepOutput) UnmarshalJSON(data []byte) error {
  91. // Try object form first
  92. var m map[string]string
  93. if err := json.Unmarshal(data, &m); err == nil {
  94. *s = StepOutput(m)
  95. return nil
  96. }
  97. // Try string shorthand: "$var" → {"$var": "=_result"}
  98. var str string
  99. if err := json.Unmarshal(data, &str); err == nil {
  100. *s = StepOutput{str: "=_result"}
  101. return nil
  102. }
  103. return fmt.Errorf("out field must be a string or object, got: %s", string(data))
  104. }
  105. // Metadata contains display/debugging information
  106. type Metadata map[string]interface{}
  107. // ExecutionContext holds the runtime state during workflow execution
  108. type ExecutionContext struct {
  109. Ctx context.Context // Go context for cancellation
  110. WorkflowID string // Workflow execution instance ID
  111. Params map[string]interface{} // Input parameters (read-only)
  112. ParamTypes map[string]string // Parameter type declarations from registry
  113. Variables map[string]interface{} // Global variables ($vars)
  114. VarTypes map[string]string // Variable type declarations from registry
  115. SystemVars map[string]interface{} // System variables (SYSVAR.xxx)
  116. LocalVars map[string]interface{} // Local variables (_item, _index, _result)
  117. Artifacts map[string]string // Temporary file references
  118. CurrentStepID string // Current executing step
  119. Status ExecutionStatus // Current execution status
  120. StartTime time.Time // Execution start time
  121. RunEventStream chan RunEvent // v3.12+: structured run_events stream (spec Chapter 13)
  122. ServiceAdapter ServiceAdapter // Adapter for calling services
  123. APIAdapter APIAdapter // Adapter for calling third-party APIs
  124. ComponentAdapter ComponentAdapter // Adapter for calling components
  125. LLMAdapter LLMAdapter // Adapter for calling LLMs
  126. LLMAdapterRegistry *LLMAdapterRegistry // v3.16+: multi-provider LLM adapter registry (nil for pre-3.16)
  127. FileAdapter FileAdapter // Adapter for file operations
  128. DocAdapter DocAdapter // Adapter for resolving semantic documents
  129. PauseState *PauseState // v3.15+: current pause state (non-nil only when Status == StatusPaused)
  130. pauseMu sync.Mutex // v3.15+: protects PauseState pointer field
  131. RunParams *RunParams // v3.15+: optional run-level control parameters (spec §2.2)
  132. }
  133. // ExecutionStatus represents the current status of workflow execution
  134. type ExecutionStatus string
  135. const (
  136. StatusRunning ExecutionStatus = "running"
  137. StatusCompleted ExecutionStatus = "completed"
  138. StatusFailed ExecutionStatus = "failed"
  139. StatusStopped ExecutionStatus = "stopped"
  140. StatusPaused ExecutionStatus = "paused" // v3.15+: workflow is suspended at a Pause_* node, awaiting resume
  141. )
  142. // RunEventType represents the type of a run_events event (spec 3.12 Chapter 13)
  143. type RunEventType string
  144. const (
  145. RunEventWorkflowStart RunEventType = "workflow_start"
  146. RunEventWorkflowDone RunEventType = "workflow_done"
  147. RunEventWorkflowFailed RunEventType = "workflow_failed"
  148. RunEventWorkflowCancelled RunEventType = "workflow_cancelled"
  149. RunEventStepStart RunEventType = "step_start"
  150. RunEventStepDone RunEventType = "step_done"
  151. RunEventStepError RunEventType = "step_error"
  152. RunEventStepSkipped RunEventType = "step_skipped"
  153. RunEventLLMToken RunEventType = "llm_token"
  154. RunEventLLMDone RunEventType = "llm_done"
  155. RunEventFileStart RunEventType = "file_start"
  156. RunEventFileDone RunEventType = "file_done"
  157. RunEventStepPrint RunEventType = "step_print"
  158. RunEventPauseStart RunEventType = "pause_start" // v3.15+: workflow entered paused state at Pause_* node
  159. RunEventPauseResumed RunEventType = "pause_resumed" // v3.15+: resume succeeded, payload written, execution continuing
  160. RunEventPauseTimeout RunEventType = "pause_timeout" // v3.15+: pause timed out, jumping to timeout.on
  161. RunEventPauseRejected RunEventType = "pause_rejected" // v3.15+: resume request rejected (invalid token, wrong state, etc.)
  162. )
  163. // RunEvent is a structured event emitted to the run_events stream (spec 3.12 Chapter 13).
  164. // Each event is independently parseable and carries a monotonic sequence number.
  165. type RunEvent struct {
  166. RunID string `json:"run_id"` // Workflow run instance ID
  167. Seq uint64 `json:"seq"` // Monotonically increasing sequence number (from 1)
  168. Ts string `json:"ts"` // ISO 8601 timestamp, millisecond precision
  169. Type RunEventType `json:"type"` // Event type (see RunEventType constants)
  170. StepID *string `json:"step_id"` // Step ID, or null for workflow-level events
  171. Payload map[string]interface{} `json:"payload"` // Event-specific payload (see spec 13.4)
  172. }
  173. // ServiceResult represents the result from a service call
  174. type ServiceResult struct {
  175. Data map[string]interface{} `json:"data"` // Business result
  176. }
  177. // StepType represents the type of workflow step
  178. type StepType string
  179. const (
  180. StepTypeService StepType = "Service"
  181. StepTypeAPI StepType = "API"
  182. StepTypeComponent StepType = "Component"
  183. StepTypeLLM StepType = "LLM"
  184. StepTypeSet StepType = "Set"
  185. StepTypeWrite StepType = "Write"
  186. StepTypeDownload StepType = "Download" // v3.14+: stream-download a file from an external URL
  187. StepTypeUnzip StepType = "Unzip" // v3.14+: extract a zip archive and route entries by extension
  188. StepTypePause StepType = "Pause" // v3.15+: suspend workflow execution pending external resume
  189. StepTypeBranch StepType = "Branch"
  190. StepTypeLoop StepType = "Loop"
  191. StepTypeStop StepType = "Stop"
  192. StepTypeNoop StepType = "Noop" // Does nothing, useful for grouping children
  193. )
  194. // WriteMode represents the file write strategy
  195. type WriteMode string
  196. const (
  197. WriteModeOverwrite WriteMode = "overwrite"
  198. WriteModeFailIfExists WriteMode = "failIfExists"
  199. WriteModeAppend WriteMode = "append"
  200. WriteModePrepend WriteMode = "prepend" // v3.14+: prepend content to the beginning of the file
  201. )
  202. // LoopMode represents the loop execution strategy
  203. type LoopMode string
  204. const (
  205. LoopModeParallel LoopMode = "parallel"
  206. LoopModeSerial LoopMode = "serial"
  207. )
  208. // GetSystemVar retrieves a system variable by name
  209. func (ctx *ExecutionContext) GetSystemVar(name string) (interface{}, bool) {
  210. val, ok := ctx.SystemVars[name]
  211. return val, ok
  212. }
  213. // SetSystemVar sets a system variable by name
  214. func (ctx *ExecutionContext) SetSystemVar(name string, value interface{}) {
  215. if ctx.SystemVars == nil {
  216. ctx.SystemVars = make(map[string]interface{})
  217. }
  218. ctx.SystemVars[name] = value
  219. }
  220. // GetParam retrieves a parameter (non-thread-safe, for backward compatibility)
  221. func (ctx *ExecutionContext) GetParam(key string) (interface{}, bool) {
  222. val, ok := ctx.Params[key]
  223. return val, ok
  224. }
  225. // GetVariable retrieves a global variable (non-thread-safe, for backward compatibility)
  226. func (ctx *ExecutionContext) GetVariable(key string) (interface{}, bool) {
  227. val, ok := ctx.Variables[key]
  228. return val, ok
  229. }
  230. // SetVariable sets a global variable (non-thread-safe, for backward compatibility)
  231. func (ctx *ExecutionContext) SetVariable(key string, value interface{}) {
  232. if ctx.Variables == nil {
  233. ctx.Variables = make(map[string]interface{})
  234. }
  235. ctx.Variables[key] = value
  236. }
  237. // GetLocalVar retrieves a local variable (non-thread-safe, for backward compatibility)
  238. func (ctx *ExecutionContext) GetLocalVar(key string) (interface{}, bool) {
  239. val, ok := ctx.LocalVars[key]
  240. return val, ok
  241. }
  242. // SetLocalVar sets a local variable (non-thread-safe, for backward compatibility)
  243. func (ctx *ExecutionContext) SetLocalVar(key string, value interface{}) {
  244. if ctx.LocalVars == nil {
  245. ctx.LocalVars = make(map[string]interface{})
  246. }
  247. ctx.LocalVars[key] = value
  248. }
  249. // DeleteLocalVar deletes a local variable (non-thread-safe, for backward compatibility)
  250. func (ctx *ExecutionContext) DeleteLocalVar(key string) {
  251. delete(ctx.LocalVars, key)
  252. }
  253. // GetArtifact retrieves an artifact (non-thread-safe, for backward compatibility)
  254. func (ctx *ExecutionContext) GetArtifact(key string) (string, bool) {
  255. val, ok := ctx.Artifacts[key]
  256. return val, ok
  257. }
  258. // SetArtifact sets an artifact (non-thread-safe, for backward compatibility)
  259. func (ctx *ExecutionContext) SetArtifact(key string, value string) {
  260. if ctx.Artifacts == nil {
  261. ctx.Artifacts = make(map[string]string)
  262. }
  263. ctx.Artifacts[key] = value
  264. }
  265. // SetArrayIndex sets an array index (non-thread-safe, for backward compatibility)
  266. func (ctx *ExecutionContext) SetArrayIndex(key string, index int, value interface{}) {
  267. root, ok := ctx.Variables[key]
  268. if !ok {
  269. slice := make([]interface{}, index+1)
  270. slice[index] = value
  271. ctx.Variables[key] = slice
  272. return
  273. }
  274. slice, ok := root.([]interface{})
  275. if !ok {
  276. slice = make([]interface{}, index+1)
  277. slice[index] = value
  278. ctx.Variables[key] = slice
  279. return
  280. }
  281. for len(slice) <= index {
  282. slice = append(slice, nil)
  283. }
  284. slice[index] = value
  285. ctx.Variables[key] = slice
  286. }
  287. // GetBaseContext returns itself for ExecutionContext
  288. func (ctx *ExecutionContext) GetBaseContext() *ExecutionContext {
  289. return ctx
  290. }
  291. // SetCurrentStepID sets the current step ID (non-thread-safe)
  292. func (ctx *ExecutionContext) SetCurrentStepID(stepID string) {
  293. ctx.CurrentStepID = stepID
  294. }
  295. // SetStatus sets the execution status (non-thread-safe)
  296. func (ctx *ExecutionContext) SetStatus(status ExecutionStatus) {
  297. ctx.Status = status
  298. }
  299. // IsStopped returns true if the execution status is stopped
  300. func (ctx *ExecutionContext) IsStopped() bool {
  301. return ctx.Status == StatusStopped
  302. }
  303. // SafeExecutionContext wraps ExecutionContext with thread-safe access
  304. type SafeExecutionContext struct {
  305. *ExecutionContext
  306. varMutex sync.RWMutex // Protects Variables map
  307. localMutex sync.Mutex // Protects LocalVars map
  308. artifactMutex sync.Mutex // Protects Artifacts map
  309. sysMutex sync.RWMutex // Protects SystemVars map
  310. statusMutex sync.Mutex // Protects Status, CurrentStepID
  311. }
  312. // NewSafeExecutionContext creates a thread-safe wrapper around ExecutionContext
  313. func NewSafeExecutionContext(ctx *ExecutionContext) *SafeExecutionContext {
  314. return &SafeExecutionContext{
  315. ExecutionContext: ctx,
  316. }
  317. }
  318. // SwapLocalVars temporarily swaps LocalVars with proper locking
  319. // Returns a cleanup function that must be called to restore original LocalVars
  320. func (s *SafeExecutionContext) SwapLocalVars(newLocalVars map[string]interface{}) func() {
  321. s.localMutex.Lock()
  322. oldLocalVars := s.LocalVars
  323. s.LocalVars = newLocalVars
  324. s.localMutex.Unlock()
  325. // Return cleanup function
  326. return func() {
  327. s.localMutex.Lock()
  328. s.LocalVars = oldLocalVars
  329. s.localMutex.Unlock()
  330. }
  331. }
  332. // GetParam retrieves a parameter with read lock
  333. func (s *SafeExecutionContext) GetParam(key string) (interface{}, bool) {
  334. s.varMutex.RLock()
  335. defer s.varMutex.RUnlock()
  336. val, ok := s.Params[key]
  337. return val, ok
  338. }
  339. // GetVariable retrieves a global variable with read lock
  340. func (s *SafeExecutionContext) GetVariable(key string) (interface{}, bool) {
  341. s.varMutex.RLock()
  342. defer s.varMutex.RUnlock()
  343. val, ok := s.Variables[key]
  344. return val, ok
  345. }
  346. // SetVariable sets a global variable with write lock
  347. func (s *SafeExecutionContext) SetVariable(key string, value interface{}) {
  348. s.varMutex.Lock()
  349. defer s.varMutex.Unlock()
  350. if s.Variables == nil {
  351. s.Variables = make(map[string]interface{})
  352. }
  353. s.Variables[key] = value
  354. }
  355. // GetLocalVar retrieves a local variable with lock
  356. func (s *SafeExecutionContext) GetLocalVar(key string) (interface{}, bool) {
  357. s.localMutex.Lock()
  358. defer s.localMutex.Unlock()
  359. val, ok := s.LocalVars[key]
  360. return val, ok
  361. }
  362. // SetLocalVar sets a local variable with lock
  363. func (s *SafeExecutionContext) SetLocalVar(key string, value interface{}) {
  364. s.localMutex.Lock()
  365. defer s.localMutex.Unlock()
  366. if s.LocalVars == nil {
  367. s.LocalVars = make(map[string]interface{})
  368. }
  369. s.LocalVars[key] = value
  370. }
  371. // DeleteLocalVar deletes a local variable with lock
  372. func (s *SafeExecutionContext) DeleteLocalVar(key string) {
  373. s.localMutex.Lock()
  374. defer s.localMutex.Unlock()
  375. delete(s.LocalVars, key)
  376. }
  377. // GetArtifact retrieves an artifact reference with lock
  378. func (s *SafeExecutionContext) GetArtifact(key string) (string, bool) {
  379. s.artifactMutex.Lock()
  380. defer s.artifactMutex.Unlock()
  381. val, ok := s.Artifacts[key]
  382. return val, ok
  383. }
  384. // SetArtifact sets an artifact reference with lock
  385. func (s *SafeExecutionContext) SetArtifact(key string, value string) {
  386. s.artifactMutex.Lock()
  387. defer s.artifactMutex.Unlock()
  388. if s.Artifacts == nil {
  389. s.Artifacts = make(map[string]string)
  390. }
  391. s.Artifacts[key] = value
  392. }
  393. // GetStatus retrieves the execution status with lock
  394. func (s *SafeExecutionContext) GetStatus() ExecutionStatus {
  395. s.statusMutex.Lock()
  396. defer s.statusMutex.Unlock()
  397. return s.Status
  398. }
  399. // SetStatus sets the execution status with lock
  400. func (s *SafeExecutionContext) SetStatus(status ExecutionStatus) {
  401. s.statusMutex.Lock()
  402. defer s.statusMutex.Unlock()
  403. s.Status = status
  404. }
  405. // IsStopped returns true if the execution status is stopped (thread-safe)
  406. func (s *SafeExecutionContext) IsStopped() bool {
  407. s.statusMutex.Lock()
  408. defer s.statusMutex.Unlock()
  409. return s.Status == StatusStopped
  410. }
  411. // GetCurrentStepID retrieves the current step ID with lock
  412. func (s *SafeExecutionContext) GetCurrentStepID() string {
  413. s.statusMutex.Lock()
  414. defer s.statusMutex.Unlock()
  415. return s.CurrentStepID
  416. }
  417. // SetCurrentStepID sets the current step ID with lock
  418. func (s *SafeExecutionContext) SetCurrentStepID(id string) {
  419. s.statusMutex.Lock()
  420. defer s.statusMutex.Unlock()
  421. s.CurrentStepID = id
  422. }
  423. // GetSystemVar retrieves a system variable with read lock
  424. func (s *SafeExecutionContext) GetSystemVar(name string) (interface{}, bool) {
  425. s.sysMutex.RLock()
  426. defer s.sysMutex.RUnlock()
  427. val, ok := s.SystemVars[name]
  428. return val, ok
  429. }
  430. // SetSystemVar sets a system variable with write lock
  431. func (s *SafeExecutionContext) SetSystemVar(name string, value interface{}) {
  432. s.sysMutex.Lock()
  433. defer s.sysMutex.Unlock()
  434. if s.SystemVars == nil {
  435. s.SystemVars = make(map[string]interface{})
  436. }
  437. s.SystemVars[name] = value
  438. }
  439. // SetArrayIndex atomically sets an array index with auto-grow
  440. func (s *SafeExecutionContext) SetArrayIndex(key string, index int, value interface{}) {
  441. s.varMutex.Lock()
  442. defer s.varMutex.Unlock()
  443. root, ok := s.Variables[key]
  444. if !ok {
  445. // Create new array
  446. slice := make([]interface{}, index+1)
  447. slice[index] = value
  448. s.Variables[key] = slice
  449. return
  450. }
  451. slice, ok := root.([]interface{})
  452. if !ok {
  453. // Not an array, create new array
  454. slice = make([]interface{}, index+1)
  455. slice[index] = value
  456. s.Variables[key] = slice
  457. return
  458. }
  459. // Grow slice if necessary
  460. for len(slice) <= index {
  461. slice = append(slice, nil)
  462. }
  463. slice[index] = value
  464. s.Variables[key] = slice
  465. }
  466. // GetBaseContext returns the underlying ExecutionContext
  467. func (s *SafeExecutionContext) GetBaseContext() *ExecutionContext {
  468. return s.ExecutionContext
  469. }
  470. // ChildExecutionContext provides isolated local variables for parallel branches
  471. type ChildExecutionContext struct {
  472. Parent *SafeExecutionContext
  473. LocalVars map[string]interface{} // Isolated local scope
  474. localStopped bool // true after this branch hits Stop_*
  475. }
  476. // NewChildExecutionContext creates a child context with isolated local variables
  477. func NewChildExecutionContext(parent *SafeExecutionContext) *ChildExecutionContext {
  478. return &ChildExecutionContext{
  479. Parent: parent,
  480. LocalVars: make(map[string]interface{}),
  481. }
  482. }
  483. // GetParam delegates to parent (shared params)
  484. func (c *ChildExecutionContext) GetParam(key string) (interface{}, bool) {
  485. return c.Parent.GetParam(key)
  486. }
  487. // GetVariable delegates to parent (shared globals)
  488. func (c *ChildExecutionContext) GetVariable(key string) (interface{}, bool) {
  489. return c.Parent.GetVariable(key)
  490. }
  491. // SetVariable delegates to parent (shared globals)
  492. func (c *ChildExecutionContext) SetVariable(key string, value interface{}) {
  493. c.Parent.SetVariable(key, value)
  494. }
  495. // GetLocalVar uses isolated local scope (no locking needed, single goroutine)
  496. func (c *ChildExecutionContext) GetLocalVar(key string) (interface{}, bool) {
  497. val, ok := c.LocalVars[key]
  498. return val, ok
  499. }
  500. // SetLocalVar uses isolated local scope (no locking needed, single goroutine)
  501. func (c *ChildExecutionContext) SetLocalVar(key string, value interface{}) {
  502. c.LocalVars[key] = value
  503. }
  504. // DeleteLocalVar deletes from isolated local scope
  505. func (c *ChildExecutionContext) DeleteLocalVar(key string) {
  506. delete(c.LocalVars, key)
  507. }
  508. // GetArtifact delegates to parent (shared artifacts)
  509. func (c *ChildExecutionContext) GetArtifact(key string) (string, bool) {
  510. return c.Parent.GetArtifact(key)
  511. }
  512. // SetArtifact delegates to parent (shared artifacts)
  513. func (c *ChildExecutionContext) SetArtifact(key string, value string) {
  514. c.Parent.SetArtifact(key, value)
  515. }
  516. // GetSystemVar delegates to parent (shared system vars)
  517. func (c *ChildExecutionContext) GetSystemVar(name string) (interface{}, bool) {
  518. return c.Parent.GetSystemVar(name)
  519. }
  520. // SetSystemVar delegates to parent (shared system vars)
  521. func (c *ChildExecutionContext) SetSystemVar(name string, value interface{}) {
  522. c.Parent.SetSystemVar(name, value)
  523. }
  524. // SetArrayIndex delegates to parent (shared global arrays)
  525. func (c *ChildExecutionContext) SetArrayIndex(key string, index int, value interface{}) {
  526. c.Parent.SetArrayIndex(key, index, value)
  527. }
  528. // GetBaseContext returns the parent's underlying ExecutionContext
  529. func (c *ChildExecutionContext) GetBaseContext() *ExecutionContext {
  530. return c.Parent.ExecutionContext
  531. }
  532. // SetCurrentStepID delegates to parent (thread-safe)
  533. func (c *ChildExecutionContext) SetCurrentStepID(stepID string) {
  534. c.Parent.SetCurrentStepID(stepID)
  535. }
  536. // SetStatus delegates to parent (thread-safe) and tracks local stopped flag
  537. func (c *ChildExecutionContext) SetStatus(status ExecutionStatus) {
  538. if status == StatusStopped {
  539. c.localStopped = true
  540. }
  541. c.Parent.SetStatus(status)
  542. }
  543. // IsStopped returns true if THIS branch hit Stop_* (not sibling branches)
  544. func (c *ChildExecutionContext) IsStopped() bool {
  545. return c.localStopped
  546. }
  547. // ContextAccessor provides an interface for accessing execution context
  548. type ContextAccessor interface {
  549. GetParam(key string) (interface{}, bool)
  550. GetVariable(key string) (interface{}, bool)
  551. SetVariable(key string, value interface{})
  552. SetArrayIndex(key string, index int, value interface{})
  553. GetLocalVar(key string) (interface{}, bool)
  554. SetLocalVar(key string, value interface{})
  555. DeleteLocalVar(key string)
  556. GetArtifact(key string) (string, bool)
  557. SetArtifact(key string, value string)
  558. GetSystemVar(name string) (interface{}, bool)
  559. SetSystemVar(name string, value interface{})
  560. GetBaseContext() *ExecutionContext
  561. // Thread-safe status field access
  562. SetCurrentStepID(stepID string)
  563. SetStatus(status ExecutionStatus)
  564. IsStopped() bool // Branch-aware stopped check (child contexts check local flag, not global)
  565. }
  566. // resumeSignal carries the payload and request ID sent by a Resume call to unblock executePauseStep.
  567. type resumeSignal struct {
  568. Payload interface{} // The payload provided by the caller; written to resumeResultTarget
  569. RequestID string // Caller-supplied idempotency key
  570. }
  571. // PauseState holds the runtime state of an active Pause_* node (v3.15+).
  572. // It is created by executePauseStep and stored in ExecutionContext.PauseState
  573. // for the duration of the pause. The Resume method uses it to send a signal.
  574. type PauseState struct {
  575. mu sync.Mutex // protects all fields below
  576. ch chan resumeSignal // buffered(1): executePauseStep reads; Resume writes
  577. token string // expected token for resume validation
  578. nodeID string // Pause_* step ID (for pause_rejected events)
  579. seenRequestIDs map[string]bool // idempotency: request IDs already processed
  580. resumed bool // true once a valid resume signal has been sent
  581. }
  582. // WaitToken returns the SHA-256 hex token that was published in the pause_start event.
  583. // This is safe to call from any goroutine while the workflow is in StatusPaused.
  584. func (p *PauseState) WaitToken() string {
  585. p.mu.Lock()
  586. defer p.mu.Unlock()
  587. return p.token
  588. }
  589. // ResumeRequest carries the caller-supplied fields for the Resume method (v3.15+).
  590. // Field names correspond to the protocol-level resume request body defined in spec §11.4.1.
  591. type ResumeRequest struct {
  592. RunID string // Optional: workflow run ID for explicit validation (spec §11.4.1 "runId"); if non-empty, must match execCtx.WorkflowID
  593. Token string // Must match the wait token from the pause_start event
  594. Payload interface{} // Arbitrary data written to resumeResultTarget in $vars
  595. RequestID string // Optional idempotency key; duplicate RequestIDs are silently no-ops
  596. }
  597. // ErrBreak is a sentinel error returned when a step's next is "BREAK".
  598. // The enclosing Loop_* executor catches this to exit the loop immediately.
  599. // It must never propagate past the loop handler.
  600. var ErrBreak = errors.New("BREAK")
  601. // IsBreakError checks if an error is the BREAK sentinel.
  602. func IsBreakError(err error) bool {
  603. return errors.Is(err, ErrBreak)
  604. }
  605. // LLMError represents a structured LLM call failure (v3.10+)
  606. type LLMError struct {
  607. Type string // e.g., "rate_limit", "auth_error", "timeout"
  608. Code string // e.g., "RATE_LIMIT"
  609. Message string // Human-readable error message
  610. Retryable bool // Whether the call can be retried
  611. StatusCode int // HTTP status code
  612. Provider string // LLM provider name
  613. Model string // Model used
  614. RequestID string // Request ID from provider
  615. ProviderError string // provider exception class
  616. Details map[string]interface{} // Additional details
  617. Raw map[string]interface{} // Raw error response
  618. }
  619. // Error implements the error interface
  620. func (e *LLMError) Error() string {
  621. return fmt.Sprintf("%s: %s (code=%s, status=%d)", e.Type, e.Message, e.Code, e.StatusCode)
  622. }
  623. // ToMap converts LLMError to a map for use as _error local variable
  624. func (e *LLMError) ToMap() map[string]interface{} {
  625. m := map[string]interface{}{
  626. "type": e.Type,
  627. "code": e.Code,
  628. "message": e.Message,
  629. "retryable": e.Retryable,
  630. }
  631. if e.StatusCode != 0 {
  632. m["status_code"] = e.StatusCode
  633. }
  634. if e.Provider != "" {
  635. m["provider"] = e.Provider
  636. }
  637. if e.Model != "" {
  638. m["model"] = e.Model
  639. }
  640. if e.RequestID != "" {
  641. m["request_id"] = e.RequestID
  642. }
  643. if e.ProviderError != "" {
  644. m["provider_error"] = e.ProviderError
  645. }
  646. if e.Details != nil {
  647. m["details"] = e.Details
  648. }
  649. if e.Raw != nil {
  650. m["raw"] = e.Raw
  651. }
  652. return m
  653. }