engine.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131
  1. package workflow
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. )
  12. // Engine is the main workflow execution engine
  13. type Engine struct {
  14. workflow *Workflow
  15. parallelExecutor *ParallelExecutor
  16. errorStrategy ParallelErrorStrategy
  17. eventSequence uint64 // Atomic counter for event sequence numbers
  18. stopOnce sync.Once // Captures the first Stop_* node ID
  19. stopNodeID string // ID of the first Stop_* node that triggered stop
  20. }
  21. // NewEngine creates a new workflow engine with default options
  22. func NewEngine(workflow *Workflow) (*Engine, error) {
  23. return NewEngineWithOptions(workflow, DefaultEngineOptions)
  24. }
  25. // NewEngineWithOptions creates a new workflow engine with custom options
  26. func NewEngineWithOptions(workflow *Workflow, opts EngineOptions) (*Engine, error) {
  27. // Validate workflow is not nil
  28. if workflow == nil {
  29. return nil, fmt.Errorf("workflow cannot be nil")
  30. }
  31. // Validate workflow structure
  32. if err := workflow.Validate(); err != nil {
  33. return nil, fmt.Errorf("invalid workflow: %w", err)
  34. }
  35. return &Engine{
  36. workflow: workflow,
  37. parallelExecutor: NewParallelExecutor(opts.MaxConcurrency),
  38. errorStrategy: opts.ParallelErrorStrategy,
  39. eventSequence: 0,
  40. }, nil
  41. }
  42. // Execute starts workflow execution with the given context and initial variables
  43. func (e *Engine) Execute(ctx context.Context, initialVars map[string]interface{}, adapters *Adapters) (*ExecutionResult, error) {
  44. // Validate required parameters
  45. if ctx == nil {
  46. return nil, fmt.Errorf("context cannot be nil")
  47. }
  48. if adapters == nil {
  49. return nil, fmt.Errorf("adapters cannot be nil")
  50. }
  51. // Create execution context
  52. execCtx := &ExecutionContext{
  53. Ctx: ctx,
  54. WorkflowID: generateWorkflowID(),
  55. Params: make(map[string]interface{}),
  56. Variables: make(map[string]interface{}),
  57. SystemVars: make(map[string]interface{}),
  58. LocalVars: make(map[string]interface{}),
  59. Artifacts: make(map[string]string),
  60. Status: StatusRunning,
  61. StartTime: time.Now(),
  62. RunEventStream: make(chan RunEvent, 500),
  63. ServiceAdapter: adapters.Service,
  64. APIAdapter: adapters.API,
  65. ComponentAdapter: adapters.Component,
  66. LLMAdapter: adapters.LLM,
  67. LLMAdapterRegistry: adapters.LLMAdapterRegistry,
  68. FileAdapter: adapters.File,
  69. DocAdapter: adapters.Doc,
  70. }
  71. // Initialize parameter types from registry
  72. execCtx.ParamTypes = make(map[string]string)
  73. if paramDecls, err := e.workflow.Registry.GetParamDeclarations(); err == nil {
  74. for name, decl := range paramDecls {
  75. execCtx.ParamTypes[name] = decl.Type
  76. }
  77. }
  78. // Initialize variable types from registry
  79. execCtx.VarTypes = make(map[string]string)
  80. if varDecls, err := e.workflow.Registry.GetVariableDeclarations(); err == nil {
  81. for name, decl := range varDecls {
  82. execCtx.VarTypes[name] = decl.Type
  83. }
  84. }
  85. // Initialize parameters and variables from initialVars
  86. // Parameters are those defined in registry.Params (no $ prefix)
  87. // Variables are those with $ prefix
  88. for k, v := range initialVars {
  89. if strings.HasPrefix(k, "$") {
  90. execCtx.Variables[k] = v
  91. } else {
  92. // Check if this is a declared parameter
  93. if paramType, ok := execCtx.ParamTypes[k]; ok {
  94. // Apply type conversion for OBJECT params
  95. convertedValue := v
  96. if paramType == "OBJECT" {
  97. if str, isString := v.(string); isString {
  98. var obj map[string]interface{}
  99. if err := json.Unmarshal([]byte(str), &obj); err != nil {
  100. return nil, fmt.Errorf("failed to convert parameter %s to OBJECT: %w", k, err)
  101. }
  102. convertedValue = obj
  103. }
  104. }
  105. execCtx.Params[k] = convertedValue
  106. } else {
  107. // Not a declared parameter, treat as variable with $ prefix
  108. execCtx.Variables["$"+k] = v
  109. }
  110. }
  111. }
  112. // Apply registry param defaults for parameters not supplied by the caller (spec §3.2).
  113. // This runs after initialVars processing so explicit caller values always take precedence.
  114. if paramDecls, err := e.workflow.Registry.GetParamDeclarations(); err == nil {
  115. for name, decl := range paramDecls {
  116. if _, alreadySet := execCtx.Params[name]; !alreadySet && decl.Default != nil {
  117. if defaultVal, coerceErr := CoerceParamDefault(decl); coerceErr == nil && defaultVal != nil {
  118. execCtx.Params[name] = defaultVal
  119. }
  120. }
  121. }
  122. }
  123. // Execute workflow
  124. go e.executeWorkflow(execCtx)
  125. // Return result
  126. return &ExecutionResult{
  127. Context: execCtx,
  128. RunEventStream: execCtx.RunEventStream,
  129. }, nil
  130. }
  131. // ExecuteWithRunParams starts workflow execution with run-level control parameters (v3.15+, spec §2.2).
  132. // RunParams allow the caller to specify a workspace scope, a selective node list, an
  133. // execution mode, and business-level input params without polluting the workflow's own registry.
  134. //
  135. // RunParams.Params (spec §1.7 runParams.params) are merged into the effective initial variables.
  136. // They take precedence over initialVars for the same key, mirroring how the online platform
  137. // passes declared registry params to the engine separately from internal engine state.
  138. func (e *Engine) ExecuteWithRunParams(ctx context.Context, initialVars map[string]interface{}, adapters *Adapters, runParams RunParams) (*ExecutionResult, error) {
  139. // Merge RunParams.Params + initialVars → RunParams.Params wins (they are the caller-declared params).
  140. // RunParams.Params keys are bare names (e.g. "prompt") per the spec §1.7.
  141. // They are passed through unchanged so Execute()'s param-routing logic places them in
  142. // execCtx.Params[name], accessible via =name expressions (no $ prefix).
  143. merged := make(map[string]interface{}, len(initialVars)+len(runParams.Params))
  144. for k, v := range initialVars {
  145. merged[k] = v
  146. }
  147. for k, v := range runParams.Params {
  148. merged[k] = v
  149. }
  150. result, err := e.Execute(ctx, merged, adapters)
  151. if err != nil {
  152. return nil, err
  153. }
  154. // Attach RunParams to the execution context so downstream steps / adapters can inspect them.
  155. result.Context.RunParams = &runParams
  156. return result, nil
  157. }
  158. // executeWorkflow executes the workflow from the beginning
  159. func (e *Engine) executeWorkflow(execCtx *ExecutionContext) {
  160. defer close(execCtx.RunEventStream)
  161. // Emit workflow_start RunEvent
  162. e.emitRunEvent(execCtx, RunEventWorkflowStart, nil, map[string]interface{}{
  163. "params": execCtx.Params,
  164. })
  165. // Find true entry nodes (spec §1.4C: unreferenced nodes, NOT always steps[0]).
  166. entryIDs := findEntryNodeIDs(e.workflow.Steps)
  167. if len(entryIDs) == 0 {
  168. e.fail(execCtx, fmt.Errorf("no entry nodes found in workflow"))
  169. return
  170. }
  171. var execErr error
  172. if len(entryIDs) == 1 {
  173. // Single entry: execute serially (common case)
  174. entry := e.findStepByID(entryIDs[0])
  175. execErr = e.executeStep(execCtx, entry)
  176. } else {
  177. // Multiple entry nodes: execute in parallel (spec §1.4C)
  178. execErr = e.executeChildren(execCtx, entryIDs)
  179. }
  180. if execErr != nil {
  181. e.fail(execCtx, execErr)
  182. return
  183. }
  184. // Terminal event: emit workflow_done AFTER all branches have finished.
  185. switch execCtx.Status {
  186. case StatusRunning:
  187. e.complete(execCtx)
  188. case StatusStopped:
  189. // Deferred from e.stop() — emit workflow_done now that every parallel
  190. // branch has returned, so all step events precede the terminal event.
  191. e.emitRunEvent(execCtx, RunEventWorkflowDone, nil, map[string]interface{}{
  192. "stop_id": e.stopNodeID,
  193. "duration_ms": time.Since(execCtx.StartTime).Milliseconds(),
  194. })
  195. }
  196. }
  197. // executeStep executes a single step
  198. func (e *Engine) executeStep(ctx ContextAccessor, step *Step) error {
  199. // Get base context
  200. baseCtx := ctx.GetBaseContext()
  201. // Short-circuit if this branch has been stopped or the workflow is paused.
  202. // For ChildExecutionContext (parallel branches), IsStopped() checks the
  203. // branch-local flag so sibling branches are NOT short-circuited.
  204. // For non-parallel paths, IsStopped() checks the global Status.
  205. // Pause is always a global check — all branches should pause.
  206. if ctx.IsStopped() || baseCtx.Status == StatusPaused {
  207. return nil
  208. }
  209. // Set current step ID using thread-safe method
  210. ctx.SetCurrentStepID(step.ID)
  211. // Collect file write targets for the step.start event and file_start RunEvents.
  212. // Expand {expr} templates in out-key paths (e.g. "/{_item.filePath}") so that
  213. // file_start carries the actual resolved path, not the raw template string.
  214. var fileTargets []string
  215. for target := range step.Out {
  216. if strings.HasPrefix(target, "/") {
  217. if expanded, err := e.interpolateFilePath(ctx, target); err == nil {
  218. fileTargets = append(fileTargets, expanded)
  219. } else {
  220. fileTargets = append(fileTargets, target)
  221. }
  222. }
  223. }
  224. if strings.HasPrefix(step.ID, "Write_") && step.Target != "" {
  225. // Try to evaluate the target path expression; fall back to the raw string
  226. evaluator := NewExpressionEvaluator(ctx)
  227. if targetVal, err := evaluator.EvaluateValue(step.Target); err == nil {
  228. if targetStr, ok := targetVal.(string); ok {
  229. fileTargets = append(fileTargets, targetStr)
  230. } else {
  231. fileTargets = append(fileTargets, step.Target)
  232. }
  233. } else {
  234. fileTargets = append(fileTargets, step.Target)
  235. }
  236. }
  237. // Check if condition
  238. stepType := e.getStepType(step.ID)
  239. stepTypePattern := string(stepType) + "_*"
  240. if step.If != "" {
  241. evaluator := NewExpressionEvaluator(ctx)
  242. var result interface{}
  243. var err error
  244. if strings.HasPrefix(step.If, "=") {
  245. result, err = evaluator.EvaluateValue(step.If)
  246. } else {
  247. result, err = evaluator.Evaluate(step.If)
  248. }
  249. if err != nil {
  250. return fmt.Errorf("failed to evaluate if condition: %w", err)
  251. }
  252. if !toBool(result) {
  253. // Emit step_skipped RunEvent (spec 3.12 §13.3)
  254. stepID := step.ID
  255. e.emitRunEvent(baseCtx, RunEventStepSkipped, &stepID, map[string]interface{}{
  256. "step_type": stepTypePattern,
  257. "reason": "if_false",
  258. })
  259. // Skip this step and its children, move to next
  260. if step.Next != "" {
  261. // Handle special next values
  262. if step.Next == "RETURN" {
  263. return nil
  264. }
  265. if step.Next == "BREAK" {
  266. return ErrBreak
  267. }
  268. nextStep := e.findStepByID(step.Next)
  269. if nextStep != nil {
  270. return e.executeStep(ctx, nextStep)
  271. }
  272. }
  273. // No next - this branch is complete
  274. return nil
  275. }
  276. }
  277. // Emit step_start RunEvent (condition passed, about to execute)
  278. stepID := step.ID
  279. e.emitRunEvent(baseCtx, RunEventStepStart, &stepID, map[string]interface{}{
  280. "step_type": stepTypePattern,
  281. })
  282. // Emit file_start RunEvents for all file targets (spec 3.13 §13.3)
  283. // Batch-emitted immediately after step_start, before any llm_token
  284. for _, target := range fileTargets {
  285. path := strings.TrimPrefix(target, "/")
  286. e.emitRunEvent(baseCtx, RunEventFileStart, &stepID, map[string]interface{}{
  287. "path": path,
  288. })
  289. }
  290. stepStartTime := time.Now()
  291. var err error
  292. switch stepType {
  293. case StepTypeService:
  294. err = e.executeServiceStep(ctx, step)
  295. case StepTypeAPI:
  296. err = e.executeAPIStep(ctx, step)
  297. case StepTypeComponent:
  298. err = e.executeComponentStep(ctx, step)
  299. case StepTypeLLM:
  300. err = e.executeLLMStep(ctx, step)
  301. case StepTypeSet:
  302. err = e.executeSetStep(ctx, step)
  303. case StepTypeWrite:
  304. err = e.executeWriteStep(ctx, step)
  305. case StepTypeDownload:
  306. err = e.executeDownloadStep(ctx, step)
  307. case StepTypeUnzip:
  308. err = e.executeUnzipStep(ctx, step)
  309. case StepTypePause:
  310. // Pause_* has special routing semantics: executePauseStep blocks until
  311. // the workflow is resumed or times out, then handles step_done and next
  312. // routing internally (similar to how Stop_* handles workflow termination).
  313. pauseErr := e.executePauseStep(ctx, step, stepStartTime, stepTypePattern)
  314. if pauseErr == nil {
  315. return nil // executePauseStep handled step_done and next routing
  316. }
  317. // Non-nil error (e.g. context cancellation): emit step_error and handle onError
  318. e.emitRunEvent(baseCtx, RunEventStepError, &stepID, map[string]interface{}{
  319. "step_type": stepTypePattern,
  320. "error": buildErrorMap(pauseErr),
  321. "duration_ms": time.Since(stepStartTime).Milliseconds(),
  322. })
  323. if step.OnError != "" && e.isV310OrLater() {
  324. ctx.SetLocalVar("_error", buildErrorMap(pauseErr))
  325. errorStep := e.findStepByID(step.OnError)
  326. if errorStep == nil {
  327. ctx.DeleteLocalVar("_error")
  328. ctx.DeleteLocalVar("_meta")
  329. return fmt.Errorf("onError step not found: %s (referenced by %s)", step.OnError, step.ID)
  330. }
  331. onErrResult := e.executeStep(ctx, errorStep)
  332. ctx.DeleteLocalVar("_error")
  333. ctx.DeleteLocalVar("_meta")
  334. return onErrResult
  335. }
  336. return pauseErr
  337. case StepTypeBranch:
  338. err = e.executeBranchStep(ctx, step)
  339. case StepTypeLoop:
  340. err = e.executeLoopStep(ctx, step)
  341. case StepTypeStop:
  342. e.stop(ctx)
  343. return nil
  344. case StepTypeNoop:
  345. // Noop step does nothing, just executes children
  346. err = nil
  347. default:
  348. return fmt.Errorf("unknown step type: %s", step.ID)
  349. }
  350. if err != nil {
  351. // v3.16+: BREAK is a control flow signal, not an error — propagate without step_error
  352. if IsBreakError(err) {
  353. return err
  354. }
  355. // Emit step_error RunEvent (spec 3.12 §13.4)
  356. e.emitRunEvent(baseCtx, RunEventStepError, &stepID, map[string]interface{}{
  357. "step_type": stepTypePattern,
  358. "error": buildErrorMap(err),
  359. "duration_ms": time.Since(stepStartTime).Milliseconds(),
  360. })
  361. // v3.10+: If step has onError handler, jump there instead of failing
  362. if step.OnError != "" && e.isV310OrLater() {
  363. // _error is built from the error (structured if LLMError, generic otherwise)
  364. ctx.SetLocalVar("_error", buildErrorMap(err))
  365. // Execute the onError handler step
  366. errorStep := e.findStepByID(step.OnError)
  367. if errorStep == nil {
  368. ctx.DeleteLocalVar("_error")
  369. ctx.DeleteLocalVar("_meta")
  370. return fmt.Errorf("onError step not found: %s (referenced by %s)", step.OnError, step.ID)
  371. }
  372. onErrResult := e.executeStep(ctx, errorStep)
  373. // Clean up after onError handler completes
  374. ctx.DeleteLocalVar("_error")
  375. ctx.DeleteLocalVar("_meta")
  376. return onErrResult
  377. }
  378. return err
  379. }
  380. // Emit step_print RunEvent (spec 3.13 §5.2.12)
  381. // Order: file_done(×N) → step_print(0..1) → step_done
  382. // Only on successful execution; Stop_* nodes cannot have print
  383. if step.Print != "" && stepType != StepTypeStop {
  384. evaluator := NewExpressionEvaluator(ctx)
  385. printVal, err := evaluator.EvaluateValue(step.Print)
  386. if err == nil {
  387. var message string
  388. switch v := printVal.(type) {
  389. case string:
  390. message = v
  391. default:
  392. if b, jerr := json.Marshal(v); jerr == nil {
  393. message = string(b)
  394. } else {
  395. message = fmt.Sprintf("%v", v)
  396. }
  397. }
  398. e.emitRunEvent(baseCtx, RunEventStepPrint, &stepID, map[string]interface{}{
  399. "message": message,
  400. })
  401. }
  402. }
  403. // Emit step_done RunEvent (spec 3.12 §13.4)
  404. e.emitRunEvent(baseCtx, RunEventStepDone, &stepID, map[string]interface{}{
  405. "step_type": stepTypePattern,
  406. "duration_ms": time.Since(stepStartTime).Milliseconds(),
  407. })
  408. // Execute children (parallel branches)
  409. // Skip for Loop_* and Branch_* which handle their own children/branches internally
  410. if len(step.Children) > 0 && stepType != StepTypeLoop && stepType != StepTypeBranch {
  411. if err := e.executeChildren(ctx, step.Children); err != nil {
  412. return err
  413. }
  414. }
  415. // Move to next step
  416. if step.Next != "" {
  417. // Handle special next values
  418. if step.Next == "RETURN" {
  419. // Return from current execution path (used in children/loops)
  420. return nil
  421. }
  422. if step.Next == "BREAK" {
  423. // v3.16+: Exit the entire enclosing loop
  424. return ErrBreak
  425. }
  426. nextStep := e.findStepByID(step.Next)
  427. if nextStep != nil {
  428. return e.executeStep(ctx, nextStep)
  429. }
  430. return fmt.Errorf("next step not found: %s", step.Next)
  431. }
  432. // No next step - this branch is complete
  433. // Children workflows ending here are considered complete
  434. // Main workflow completion is handled by executeWorkflow
  435. return nil
  436. }
  437. // getStepType extracts the step type from the ID prefix
  438. func (e *Engine) getStepType(stepID string) StepType {
  439. if strings.HasPrefix(stepID, "Service_") {
  440. return StepTypeService
  441. } else if strings.HasPrefix(stepID, "API_") {
  442. return StepTypeAPI
  443. } else if strings.HasPrefix(stepID, "Component_") {
  444. return StepTypeComponent
  445. } else if strings.HasPrefix(stepID, "LLM_") {
  446. return StepTypeLLM
  447. } else if strings.HasPrefix(stepID, "Set_") {
  448. return StepTypeSet
  449. } else if strings.HasPrefix(stepID, "Write_") {
  450. return StepTypeWrite
  451. } else if strings.HasPrefix(stepID, "Download_") {
  452. return StepTypeDownload
  453. } else if strings.HasPrefix(stepID, "Unzip_") {
  454. return StepTypeUnzip
  455. } else if strings.HasPrefix(stepID, "Pause_") {
  456. return StepTypePause
  457. } else if strings.HasPrefix(stepID, "Branch_") {
  458. return StepTypeBranch
  459. } else if strings.HasPrefix(stepID, "Loop_") {
  460. return StepTypeLoop
  461. } else if strings.HasPrefix(stepID, "Stop_") {
  462. return StepTypeStop
  463. } else if strings.HasPrefix(stepID, "Noop_") {
  464. return StepTypeNoop
  465. }
  466. return ""
  467. }
  468. // findStepByID finds a step by its ID
  469. func (e *Engine) findStepByID(stepID string) *Step {
  470. for i := range e.workflow.Steps {
  471. if e.workflow.Steps[i].ID == stepID {
  472. return &e.workflow.Steps[i]
  473. }
  474. }
  475. return nil
  476. }
  477. // stop stops the workflow (called when a Stop_* node executes).
  478. // Multiple parallel branches may each reach their own Stop_* node;
  479. // stopOnce ensures workflow_done is emitted exactly once.
  480. func (e *Engine) stop(ctx ContextAccessor) {
  481. // Use thread-safe method to set status
  482. ctx.SetStatus(StatusStopped)
  483. baseCtx := ctx.GetBaseContext()
  484. // Capture the first Stop_* node ID. The actual workflow_done event is emitted
  485. // by executeWorkflow AFTER all parallel branches have finished, so the SSE
  486. // reader sees every step event before the terminal event.
  487. e.stopOnce.Do(func() {
  488. e.stopNodeID = baseCtx.CurrentStepID
  489. })
  490. }
  491. // complete marks the workflow as completed (reached end naturally, no Stop_* node)
  492. func (e *Engine) complete(ctx ContextAccessor) {
  493. ctx.SetStatus(StatusCompleted)
  494. baseCtx := ctx.GetBaseContext()
  495. // Emit workflow_done so clients receive the same signal whether the workflow
  496. // terminated via Stop_* or ran off the end of the last step.
  497. e.emitRunEvent(baseCtx, RunEventWorkflowDone, nil, map[string]interface{}{
  498. "duration_ms": time.Since(baseCtx.StartTime).Milliseconds(),
  499. })
  500. }
  501. // fail fails the workflow
  502. func (e *Engine) fail(ctx ContextAccessor, err error) {
  503. // Use thread-safe method to set status
  504. ctx.SetStatus(StatusFailed)
  505. baseCtx := ctx.GetBaseContext()
  506. // Emit workflow_failed RunEvent (spec 3.12 §13.4)
  507. e.emitRunEvent(baseCtx, RunEventWorkflowFailed, nil, map[string]interface{}{
  508. "failed_step_id": baseCtx.CurrentStepID,
  509. "error": buildErrorMap(err),
  510. "duration_ms": time.Since(baseCtx.StartTime).Milliseconds(),
  511. })
  512. }
  513. // Resume sends a resume signal to a workflow that is suspended at a Pause_* node (v3.15+).
  514. // It is idempotent: a duplicate call with the same RequestID is silently ignored.
  515. // Returns an error if the workflow is not currently paused, the token is invalid,
  516. // or the resume signal has already been delivered.
  517. func (e *Engine) Resume(execCtx *ExecutionContext, req ResumeRequest) error {
  518. execCtx.pauseMu.Lock()
  519. state := execCtx.PauseState
  520. execCtx.pauseMu.Unlock()
  521. if state == nil {
  522. // Emit pause_rejected for state_error (spec §11.5.4: non-paused state → rejected)
  523. e.emitRunEvent(execCtx, RunEventPauseRejected, nil, map[string]interface{}{
  524. "requestId": req.RequestID,
  525. "reasonCode": "state_error",
  526. })
  527. return fmt.Errorf("workflow is not in paused state")
  528. }
  529. // Validate RunID if provided (spec §11.4.1: runId must identify the correct run)
  530. if req.RunID != "" && req.RunID != execCtx.WorkflowID {
  531. return fmt.Errorf("resume runId %q does not match workflow run %q", req.RunID, execCtx.WorkflowID)
  532. }
  533. state.mu.Lock()
  534. defer state.mu.Unlock()
  535. // Validate token before any idempotency check
  536. if req.Token != state.token {
  537. nodeID := state.nodeID
  538. e.emitRunEvent(execCtx, RunEventPauseRejected, &nodeID, map[string]interface{}{
  539. "nodeId": state.nodeID,
  540. "requestId": req.RequestID,
  541. "reasonCode": "invalid_token",
  542. })
  543. return fmt.Errorf("invalid resume token")
  544. }
  545. // Idempotency: duplicate RequestID → silent no-op
  546. if req.RequestID != "" && state.seenRequestIDs[req.RequestID] {
  547. return nil
  548. }
  549. // Reject if already resumed by a different request
  550. if state.resumed {
  551. nodeID := state.nodeID
  552. e.emitRunEvent(execCtx, RunEventPauseRejected, &nodeID, map[string]interface{}{
  553. "nodeId": state.nodeID,
  554. "requestId": req.RequestID,
  555. "reasonCode": "already_resumed",
  556. })
  557. return fmt.Errorf("workflow has already been resumed")
  558. }
  559. // Record request ID for idempotency before sending
  560. if req.RequestID != "" {
  561. state.seenRequestIDs[req.RequestID] = true
  562. }
  563. state.resumed = true
  564. // Send signal (buffered channel size 1, non-blocking)
  565. state.ch <- resumeSignal{Payload: req.Payload, RequestID: req.RequestID}
  566. return nil
  567. }
  568. // snapshotLocalVars copies known local variables from a ContextAccessor
  569. // so they can be propagated to child contexts (e.g., _item/_index from loops)
  570. func snapshotLocalVars(ctx ContextAccessor) map[string]interface{} {
  571. snapshot := make(map[string]interface{})
  572. for _, key := range []string{"_item", "_index", "_result", "_meta", "_error"} {
  573. if val, ok := ctx.GetLocalVar(key); ok {
  574. snapshot[key] = val
  575. }
  576. }
  577. return snapshot
  578. }
  579. // executeChildren executes child steps in parallel
  580. func (e *Engine) executeChildren(parentCtx ContextAccessor, childIDs []string) error {
  581. if len(childIDs) == 0 {
  582. return nil
  583. }
  584. baseCtx := parentCtx.GetBaseContext()
  585. // Optimization: single child doesn't need parallelism
  586. if len(childIDs) == 1 {
  587. child := e.findStepByID(childIDs[0])
  588. if child == nil {
  589. return fmt.Errorf("child step not found: %s", childIDs[0])
  590. }
  591. return e.executeStep(parentCtx, child)
  592. }
  593. // Parallel execution for multiple children
  594. safeCtx := NewSafeExecutionContext(baseCtx)
  595. executor := e.getParallelExecutor()
  596. // Snapshot parent's local vars so child branches inherit them (e.g., _item/_index from loops)
  597. parentLocalVars := snapshotLocalVars(parentCtx)
  598. // Create branches for each child
  599. branches := make([]ParallelBranch, len(childIDs))
  600. for i, childID := range childIDs {
  601. id := childID
  602. branches[i] = ParallelBranch{
  603. ID: id,
  604. Fn: func(ctx context.Context) error {
  605. // Create child context with isolated local vars
  606. childCtx := NewChildExecutionContext(safeCtx)
  607. // Propagate parent's local vars (e.g., _item, _index from enclosing loop)
  608. for k, v := range parentLocalVars {
  609. childCtx.LocalVars[k] = v
  610. }
  611. // Check cancellation
  612. select {
  613. case <-ctx.Done():
  614. return ctx.Err()
  615. default:
  616. }
  617. child := e.findStepByID(id)
  618. if child == nil {
  619. return fmt.Errorf("child step not found: %s", id)
  620. }
  621. // Execute with child context
  622. if err := e.executeStep(childCtx, child); err != nil {
  623. return fmt.Errorf("child %s: %w", id, err)
  624. }
  625. return nil
  626. },
  627. }
  628. }
  629. // Execute all branches in parallel
  630. if err := executor.Execute(baseCtx.Ctx, branches, e.getErrorStrategy()); err != nil {
  631. return fmt.Errorf("parallel children execution failed: %w", err)
  632. }
  633. return nil
  634. }
  635. // getParallelExecutor returns the parallel executor
  636. func (e *Engine) getParallelExecutor() *ParallelExecutor {
  637. return e.parallelExecutor
  638. }
  639. // getErrorStrategy returns the configured error strategy
  640. func (e *Engine) getErrorStrategy() ParallelErrorStrategy {
  641. return e.errorStrategy
  642. }
  643. // isV310OrLater returns true if the workflow version is 3.10 or later
  644. func (e *Engine) isV310OrLater() bool {
  645. v := e.workflow.Version
  646. return v == "3.10" || v == "3.12" || v == "3.13" || v == "3.14" || v == "3.15" || v == "3.16"
  647. }
  648. // buildErrorMap converts an error into a map suitable for _error local variable
  649. func buildErrorMap(err error) map[string]interface{} {
  650. var llmErr *LLMError
  651. if errors.As(err, &llmErr) {
  652. return llmErr.ToMap()
  653. }
  654. // Generic error fallback
  655. return map[string]interface{}{
  656. "type": "internal_error",
  657. "code": "INTERNAL_ERROR",
  658. "message": err.Error(),
  659. }
  660. }
  661. // nextEventSequence returns the next event sequence number
  662. func (e *Engine) nextEventSequence() uint64 {
  663. return atomic.AddUint64(&e.eventSequence, 1)
  664. }
  665. // emitRunEvent emits a structured RunEvent to the run_events stream (spec 3.12 Chapter 13).
  666. // Uses a non-blocking send (fire-and-forget): events are dropped if the channel is full or closed.
  667. // The send is wrapped in a recover to guard against "send on closed channel" panics that can
  668. // occur when Resume emits a pause_rejected event after the workflow goroutine has already closed
  669. // the stream (e.g. the workflow completed between the paused check and the emit call).
  670. func (e *Engine) emitRunEvent(ctx *ExecutionContext, eventType RunEventType, stepID *string, payload map[string]interface{}) {
  671. if ctx.RunEventStream == nil {
  672. return
  673. }
  674. event := RunEvent{
  675. RunID: ctx.WorkflowID,
  676. Seq: e.nextEventSequence(),
  677. Ts: time.Now().UTC().Format("2006-01-02T15:04:05.000Z07:00"),
  678. Type: eventType,
  679. StepID: stepID,
  680. Payload: payload,
  681. }
  682. // Recover from "send on closed channel" panic (can happen when Resume is called
  683. // after the workflow goroutine has closed RunEventStream).
  684. defer func() { recover() }() //nolint:errcheck
  685. select {
  686. case ctx.RunEventStream <- event:
  687. default:
  688. // fire-and-forget: drop silently if channel is full
  689. }
  690. }
  691. // Adapters holds all the adapters needed for workflow execution
  692. type Adapters struct {
  693. Service ServiceAdapter
  694. API APIAdapter
  695. Component ComponentAdapter
  696. LLM LLMAdapter
  697. LLMAdapterRegistry *LLMAdapterRegistry // v3.16+: multi-provider registry (nil for pre-3.16)
  698. File FileAdapter
  699. Doc DocAdapter
  700. }
  701. // ExecutionResult holds the result of workflow execution
  702. type ExecutionResult struct {
  703. Context *ExecutionContext
  704. RunEventStream <-chan RunEvent // v3.12+: structured run_events stream (spec Chapter 13)
  705. }
  706. // Validate validates the workflow structure
  707. func (w *Workflow) Validate() error {
  708. if w.Version != "3.6" && w.Version != "3.7" && w.Version != "3.8" && w.Version != "3.9" && w.Version != "3.10" && w.Version != "3.12" && w.Version != "3.13" && w.Version != "3.14" && w.Version != "3.15" && w.Version != "3.16" {
  709. return fmt.Errorf("unsupported workflow version: %s (expected 3.6, 3.7, 3.8, 3.9, 3.10, 3.12, 3.13, 3.14, 3.15, or 3.16)", w.Version)
  710. }
  711. if w.Name == "" {
  712. return fmt.Errorf("workflow name is required")
  713. }
  714. if len(w.Steps) == 0 {
  715. return fmt.Errorf("workflow must have at least one step")
  716. }
  717. // Validate registry
  718. if err := w.Registry.ValidateRegistry(); err != nil {
  719. return err
  720. }
  721. // IDE scenario validation (v3.9+): forbid Service_* nodes
  722. if w.WorkflowType == WorkflowTypeIDE {
  723. // Check that no Service_* nodes exist in steps
  724. for _, step := range w.Steps {
  725. if strings.HasPrefix(step.ID, "Service_") {
  726. return fmt.Errorf("IDE workflow (WorkflowType: IDE) cannot contain Service_* nodes: %s", step.ID)
  727. }
  728. }
  729. // Check that registry.services is empty
  730. if len(w.Registry.Services) > 0 {
  731. return fmt.Errorf("IDE workflow (WorkflowType: IDE) must have empty registry.services")
  732. }
  733. }
  734. // ── Phase 1: Build stepID set, check uniqueness and basic field constraints ──────────
  735. stepIDs := make(map[string]bool)
  736. for _, step := range w.Steps {
  737. if stepIDs[step.ID] {
  738. return fmt.Errorf("duplicate step ID: %s", step.ID)
  739. }
  740. // v3.16+: BREAK and RETURN are reserved keywords, cannot be used as stepId
  741. if step.ID == "BREAK" {
  742. return fmt.Errorf("'BREAK' is a reserved keyword and cannot be used as a step ID")
  743. }
  744. if step.ID == "RETURN" {
  745. return fmt.Errorf("'RETURN' is a reserved keyword and cannot be used as a step ID")
  746. }
  747. stepIDs[step.ID] = true
  748. // Stop_* must NOT have next or children (spec §2.3.4)
  749. if strings.HasPrefix(step.ID, "Stop_") {
  750. if step.Next != "" {
  751. return fmt.Errorf("step %s: Stop_* steps cannot have a 'next' field (spec §2.3.4)", step.ID)
  752. }
  753. if len(step.Children) > 0 {
  754. return fmt.Errorf("step %s: Stop_* steps cannot have 'children' (spec §2.3.4)", step.ID)
  755. }
  756. } else {
  757. // All other steps must declare a 'next' field
  758. if step.Next == "" {
  759. return fmt.Errorf("step %s must have a 'next' field (use 'RETURN' for child/loop steps)", step.ID)
  760. }
  761. }
  762. }
  763. // ── Phase 2: Reference integrity (spec §2.3.3) ────────────────────────────────────
  764. // Every next / children / cases / timeout.on / onError value must resolve to an
  765. // existing step ID (or the special sentinels "RETURN" / "BREAK").
  766. for _, step := range w.Steps {
  767. check := func(fieldName, target string) error {
  768. if target == "" || target == "RETURN" || target == "BREAK" {
  769. return nil
  770. }
  771. if !stepIDs[target] {
  772. return fmt.Errorf("step %s: %s references non-existent step %q", step.ID, fieldName, target)
  773. }
  774. return nil
  775. }
  776. if err := check("next", step.Next); err != nil {
  777. return err
  778. }
  779. for i, childID := range step.Children {
  780. if err := check(fmt.Sprintf("children[%d]", i), childID); err != nil {
  781. return err
  782. }
  783. }
  784. for i, c := range step.Cases {
  785. if len(c) == 2 {
  786. if err := check(fmt.Sprintf("cases[%d]", i), c[1]); err != nil {
  787. return err
  788. }
  789. }
  790. }
  791. if step.Timeout != nil {
  792. if err := check("timeout.on", step.Timeout.On); err != nil {
  793. return err
  794. }
  795. }
  796. if err := check("onError", step.OnError); err != nil {
  797. return err
  798. }
  799. }
  800. // ── Phase 2b: BREAK scope validation (v3.16+) ─────────────────────────────────────
  801. // BREAK is only valid inside a Loop_* children subtree.
  802. // Build the set of stepIDs that are transitively reachable from any Loop_* children entry.
  803. loopChildren := make(map[string]bool)
  804. for _, step := range w.Steps {
  805. if !strings.HasPrefix(step.ID, "Loop_") || len(step.Children) == 0 {
  806. continue
  807. }
  808. // BFS from this Loop_*'s children entries
  809. visited := make(map[string]bool)
  810. bfsQueue := make([]string, len(step.Children))
  811. copy(bfsQueue, step.Children)
  812. for _, id := range step.Children {
  813. visited[id] = true
  814. }
  815. for len(bfsQueue) > 0 {
  816. cur := bfsQueue[0]
  817. bfsQueue = bfsQueue[1:]
  818. loopChildren[cur] = true
  819. // Find the step and add its successors (skip RETURN/BREAK which terminate)
  820. for _, s := range w.Steps {
  821. if s.ID != cur {
  822. continue
  823. }
  824. succs := []string{s.Next, s.OnError}
  825. succs = append(succs, s.Children...)
  826. for _, c := range s.Cases {
  827. if len(c) == 2 {
  828. succs = append(succs, c[1])
  829. }
  830. }
  831. if s.Timeout != nil {
  832. succs = append(succs, s.Timeout.On)
  833. }
  834. for _, sid := range succs {
  835. if sid == "" || sid == "RETURN" || sid == "BREAK" || visited[sid] {
  836. continue
  837. }
  838. visited[sid] = true
  839. bfsQueue = append(bfsQueue, sid)
  840. }
  841. break
  842. }
  843. }
  844. }
  845. // Verify every step with next:"BREAK" is inside a Loop_* children subtree
  846. for _, step := range w.Steps {
  847. if step.Next == "BREAK" && !loopChildren[step.ID] {
  848. return fmt.Errorf("step %s: 'BREAK' is only valid inside a Loop_* children subtree", step.ID)
  849. }
  850. // Also check Branch cases that target BREAK-reachable steps
  851. for _, c := range step.Cases {
  852. if len(c) == 2 && c[1] == "BREAK" && !loopChildren[step.ID] {
  853. return fmt.Errorf("step %s: BREAK target in cases is only valid inside a Loop_* children subtree", step.ID)
  854. }
  855. }
  856. }
  857. // ── Phase 3: Entry node validation (spec §2.3.1) ──────────────────────────────────
  858. entryNodeIDs := findEntryNodeIDs(w.Steps)
  859. if len(entryNodeIDs) == 0 {
  860. return fmt.Errorf("workflow graph has no entry node: every step is referenced by another step, creating a cycle with no starting point")
  861. }
  862. // ── Phase 4: Reachability (spec §2.3.2) ───────────────────────────────────────────
  863. // All steps must be reachable from at least one entry node via BFS.
  864. reachable := make(map[string]bool)
  865. queue := make([]string, len(entryNodeIDs))
  866. copy(queue, entryNodeIDs)
  867. for _, id := range entryNodeIDs {
  868. reachable[id] = true
  869. }
  870. for len(queue) > 0 {
  871. curr := queue[0]
  872. queue = queue[1:]
  873. // Find step (linear scan is fine at validation time)
  874. for _, step := range w.Steps {
  875. if step.ID != curr {
  876. continue
  877. }
  878. successors := []string{step.Next, step.OnError}
  879. successors = append(successors, step.Children...)
  880. for _, c := range step.Cases {
  881. if len(c) == 2 {
  882. successors = append(successors, c[1])
  883. }
  884. }
  885. if step.Timeout != nil {
  886. successors = append(successors, step.Timeout.On)
  887. }
  888. for _, s := range successors {
  889. if s != "" && s != "RETURN" && s != "BREAK" && !reachable[s] {
  890. reachable[s] = true
  891. queue = append(queue, s)
  892. }
  893. }
  894. break
  895. }
  896. }
  897. for _, step := range w.Steps {
  898. if !reachable[step.ID] {
  899. return fmt.Errorf("step %s is unreachable from any entry node (spec §2.3.2)", step.ID)
  900. }
  901. }
  902. // Validate Download_* and Unzip_* field constraints (v3.14+)
  903. for _, step := range w.Steps {
  904. if strings.HasPrefix(step.ID, "Download_") {
  905. // source is required
  906. if step.Source == nil {
  907. return fmt.Errorf("step %s: 'source' is required for Download_* steps", step.ID)
  908. }
  909. // target and routeByExt/defaultDir are mutually exclusive; at least one is required
  910. hasTarget := step.Target != ""
  911. hasRoute := len(step.RouteByExt) > 0 || step.DefaultDir != ""
  912. if hasTarget && hasRoute {
  913. return fmt.Errorf("step %s: 'target' and 'routeByExt'/'defaultDir' are mutually exclusive for Download_* steps", step.ID)
  914. }
  915. if !hasTarget && !hasRoute {
  916. return fmt.Errorf("step %s: Download_* steps must specify either 'target' or 'routeByExt'/'defaultDir'", step.ID)
  917. }
  918. }
  919. if strings.HasPrefix(step.ID, "Unzip_") {
  920. // source is required
  921. if step.Source == nil {
  922. return fmt.Errorf("step %s: 'source' is required for Unzip_* steps", step.ID)
  923. }
  924. // routeByExt is required (may be empty map {} combined with defaultDir, but the field must be declared)
  925. if step.RouteByExt == nil {
  926. return fmt.Errorf("step %s: 'routeByExt' is required for Unzip_* steps (use {} with 'defaultDir' to route all files to one directory)", step.ID)
  927. }
  928. }
  929. if strings.HasPrefix(step.ID, "Pause_") {
  930. // resumeResultTarget is required (spec §5.1, §11.2.1)
  931. if step.ResumeResultTarget == "" {
  932. return fmt.Errorf("step %s: 'resumeResultTarget' is required for Pause_* steps", step.ID)
  933. }
  934. // children is not applicable for Pause_* (spec §5.1: 不适用)
  935. if len(step.Children) > 0 {
  936. return fmt.Errorf("step %s: 'children' is not supported on Pause_* steps (spec §5.1)", step.ID)
  937. }
  938. // If timeout is provided, sec must be > 0 and on must be non-empty (spec §11.2.2)
  939. if step.Timeout != nil {
  940. if step.Timeout.Sec <= 0 {
  941. return fmt.Errorf("step %s: timeout.sec must be > 0 for Pause_* steps", step.ID)
  942. }
  943. if step.Timeout.On == "" {
  944. return fmt.Errorf("step %s: timeout.on is required when timeout is configured for Pause_* steps", step.ID)
  945. }
  946. }
  947. }
  948. // v3.16+: Loop_* field constraints
  949. if strings.HasPrefix(step.ID, "Loop_") {
  950. hasWhile := step.While != ""
  951. hasSource := step.Source != nil
  952. // source is interface{} — also check string form
  953. if sourceStr, ok := step.Source.(string); ok && sourceStr == "" {
  954. hasSource = false
  955. }
  956. // while and source both present → ERROR
  957. if hasWhile && hasSource {
  958. return fmt.Errorf("step %s: 'while' and 'source' are mutually exclusive on Loop_* steps", step.ID)
  959. }
  960. // while without maxIterations → ERROR
  961. if hasWhile && step.MaxIterations == nil {
  962. return fmt.Errorf("step %s: 'maxIterations' is required when 'while' is used on Loop_* steps", step.ID)
  963. }
  964. // maxIterations < 1 → ERROR
  965. if step.MaxIterations != nil && *step.MaxIterations < 1 {
  966. return fmt.Errorf("step %s: 'maxIterations' must be >= 1, got %d", step.ID, *step.MaxIterations)
  967. }
  968. // while + mode:"parallel" → ERROR
  969. if hasWhile && step.Mode == "parallel" {
  970. return fmt.Errorf("step %s: 'while' mode requires mode 'serial', got 'parallel'", step.ID)
  971. }
  972. }
  973. // v3.16+: LLM_* model field validation
  974. if strings.HasPrefix(step.ID, "LLM_") && step.Model != "" {
  975. // model contains "/" but provider or modelId part is empty → ERROR
  976. if strings.Contains(step.Model, "/") {
  977. parts := strings.SplitN(step.Model, "/", 2)
  978. if parts[0] == "" || parts[1] == "" {
  979. return fmt.Errorf("step %s: 'model' with '/' must have non-empty provider and modelId (got %q)", step.ID, step.Model)
  980. }
  981. }
  982. }
  983. }
  984. return nil
  985. }
  986. // generateWorkflowID generates a unique workflow execution ID
  987. func generateWorkflowID() string {
  988. return fmt.Sprintf("wf_%d", time.Now().UnixNano())
  989. }
  990. // findEntryNodeIDs returns the IDs of all "entry" nodes in a step list.
  991. // An entry node is one whose ID does not appear in any other step's
  992. // next / children / cases / timeout.on / onError fields.
  993. // Spec §1.4C: the engine auto-identifies entry nodes; it MUST NOT always use steps[0].
  994. func findEntryNodeIDs(steps []Step) []string {
  995. referenced := make(map[string]bool)
  996. for _, step := range steps {
  997. if step.Next != "" && step.Next != "RETURN" && step.Next != "BREAK" {
  998. referenced[step.Next] = true
  999. }
  1000. for _, childID := range step.Children {
  1001. referenced[childID] = true
  1002. }
  1003. for _, c := range step.Cases {
  1004. if len(c) == 2 {
  1005. referenced[c[1]] = true
  1006. }
  1007. }
  1008. if step.Timeout != nil && step.Timeout.On != "" {
  1009. referenced[step.Timeout.On] = true
  1010. }
  1011. if step.OnError != "" {
  1012. referenced[step.OnError] = true
  1013. }
  1014. }
  1015. var entries []string
  1016. for _, step := range steps {
  1017. if !referenced[step.ID] {
  1018. entries = append(entries, step.ID)
  1019. }
  1020. }
  1021. return entries
  1022. }