engine.js 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709
  1. /**
  2. * VL Workflow Engine — Main Engine
  3. * Ported from Go: workflow/engine.go
  4. * Spec: v3.16
  5. *
  6. * Usage:
  7. * const engine = new Engine(workflow);
  8. * const result = await engine.execute(initialVars, adapters);
  9. * result.context.onEvent(event => console.log(event));
  10. */
  11. const { Registry, parseParamDeclaration, parseVariableDeclaration, validateRegistry } = require('./registry');
  12. const { ParallelExecutor } = require('./parallel');
  13. const {
  14. getStepType, resolveStepBehavior, ExecutionStatus, RunEventType, ParallelErrorStrategy,
  15. SUPPORTED_VERSIONS, isV310OrLater, RESERVED_NEXT_KEYWORDS, buildErrorMap,
  16. ExecutionContext, ChildExecutionContext, structuredCloneJSON
  17. } = require('./types');
  18. const { ExpressionEvaluator, toBool } = require('./expression');
  19. const {
  20. executeServiceStep, executeComponentStep, executeLLMStep,
  21. executeAPIStep, executeSetStep, executeWriteStep,
  22. executeBranchStep, executeLoopStep,
  23. executeDownloadStep, executeUnzipStep, executePauseStep,
  24. emitEvent, LoopBreakSignal
  25. } = require('./executor');
  26. class Engine {
  27. /**
  28. * @param {object} workflow — parsed workflow JSON
  29. * @param {object} [options]
  30. * @param {number} [options.maxConcurrency]
  31. * @param {string} [options.errorStrategy]
  32. */
  33. constructor(workflow, options = {}) {
  34. this.workflow = workflow;
  35. this.registry = new Registry(workflow.registry || {});
  36. this.stepMap = new Map();
  37. for (const step of (workflow.steps || [])) {
  38. this.stepMap.set(step.id, step);
  39. }
  40. this.errorStrategy = options.errorStrategy || ParallelErrorStrategy.FailFast;
  41. this.parallelExecutor = new ParallelExecutor(options.maxConcurrency || 0);
  42. // Custom step handlers: { Prefix: async (engine, ctx, step) => {...} }
  43. this.customHandlers = options.customHandlers || {};
  44. // Event listener attached to every ExecutionContext created by execute()
  45. this._onEvent = options.onEvent || null;
  46. // Checkpoint callback: called after each step completes with ctx.checkpoint()
  47. this._onCheckpoint = options.onCheckpoint || null;
  48. }
  49. /** Check if a step prefix has a custom handler. */
  50. _isCustomStep(stepID) {
  51. const prefix = stepID.split('_')[0];
  52. return !!this.customHandlers[prefix];
  53. }
  54. getStep(id) { return this.stepMap.get(id) || null; }
  55. // ─── Validate ──────────────────────────────────────────────
  56. validate() {
  57. const errors = [];
  58. const wf = this.workflow;
  59. if (!SUPPORTED_VERSIONS.has(wf.version)) errors.push(`Unsupported version: ${wf.version}`);
  60. if (!wf.name) errors.push('Workflow name required');
  61. if (!wf.steps || wf.steps.length === 0) errors.push('At least one step required');
  62. // Registry validation
  63. errors.push(...validateRegistry(wf.registry));
  64. // Step validation
  65. const ids = new Set();
  66. for (const step of (wf.steps || [])) {
  67. if (ids.has(step.id)) errors.push(`Duplicate step ID: ${step.id}`);
  68. ids.add(step.id);
  69. const type = getStepType(step.id);
  70. if (!type && !this._isCustomStep(step.id)) errors.push(`Unknown step type: ${step.id}`);
  71. if (type === 'Stop' && (step.next || (step.children && step.children.length))) {
  72. errors.push(`Stop_* cannot have next or children: ${step.id}`);
  73. }
  74. }
  75. // Reference integrity
  76. for (const step of (wf.steps || [])) {
  77. if (step.next && !RESERVED_NEXT_KEYWORDS.has(step.next) && !ids.has(step.next)) {
  78. errors.push(`Step ${step.id}: next references unknown step ${step.next}`);
  79. }
  80. for (const childId of (step.children || [])) {
  81. if (!ids.has(childId)) errors.push(`Step ${step.id}: child references unknown step ${childId}`);
  82. }
  83. for (const cas of (step.cases || [])) {
  84. if (Array.isArray(cas) && cas.length >= 2 && !ids.has(cas[1])) {
  85. errors.push(`Step ${step.id}: case references unknown step ${cas[1]}`);
  86. }
  87. if (cas && typeof cas === 'object' && !Array.isArray(cas)) {
  88. const ref = cas.next || cas.step;
  89. if (ref && !ids.has(ref)) errors.push(`Step ${step.id}: case references unknown step ${ref}`);
  90. }
  91. }
  92. if (step.default && !ids.has(step.default)) {
  93. errors.push(`Step ${step.id}: default references unknown step ${step.default}`);
  94. }
  95. // Check_* style: if_true / if_false references
  96. if (step.if_true && !ids.has(step.if_true)) {
  97. errors.push(`Step ${step.id}: if_true references unknown step ${step.if_true}`);
  98. }
  99. if (step.if_false && !ids.has(step.if_false)) {
  100. errors.push(`Step ${step.id}: if_false references unknown step ${step.if_false}`);
  101. }
  102. // onError can be a step ID or a builtin keyword (skip, retry, ignore)
  103. const ONERROR_BUILTINS = new Set(['skip', 'retry', 'ignore', 'continue']);
  104. if (step.onError && !ids.has(step.onError) && !ONERROR_BUILTINS.has(step.onError)) {
  105. errors.push(`Step ${step.id}: onError references unknown step ${step.onError}`);
  106. }
  107. if (step.timeout?.on && !ids.has(step.timeout.on)) {
  108. errors.push(`Step ${step.id}: timeout.on references unknown step ${step.timeout.on}`);
  109. }
  110. // v3.16 — Loop: while and source are mutually exclusive
  111. const sType = getStepType(step.id);
  112. if (sType === 'Loop') {
  113. if (step.while != null && step.source != null) {
  114. errors.push(`Step ${step.id}: Loop cannot have both "while" and "source" (mutually exclusive)`);
  115. }
  116. if (step.maxIterations != null && (typeof step.maxIterations !== 'number' || step.maxIterations < 1)) {
  117. errors.push(`Step ${step.id}: maxIterations must be a positive integer`);
  118. }
  119. }
  120. // v3.16 — BREAK only valid inside Loop children
  121. if (step.next === 'BREAK') {
  122. const isLoopChild = (wf.steps || []).some(
  123. s => getStepType(s.id) === 'Loop' && (s.children || []).includes(step.id)
  124. );
  125. if (!isLoopChild) {
  126. errors.push(`Step ${step.id}: BREAK is only valid inside Loop children`);
  127. }
  128. }
  129. // v3.16 — LLM model format: <provider>/<modelId> or <provider> or omitted
  130. if (sType === 'LLM' && step.model) {
  131. const parts = step.model.split('/');
  132. if (parts.length > 2) {
  133. errors.push(`Step ${step.id}: model format must be "<provider>/<modelId>" or "<provider>"`);
  134. }
  135. }
  136. }
  137. return errors;
  138. }
  139. // ─── Execute ───────────────────────────────────────────────
  140. /**
  141. * Execute the workflow.
  142. * @param {object} initialVars — initial param/variable values
  143. * @param {object} adapters — { service, api, component, llm, file, doc }
  144. * @param {object} [runParams] — optional run-level params
  145. * @returns {ExecutionContext}
  146. */
  147. async execute(initialVars = {}, adapters = {}, runParams = null) {
  148. const workflowID = `wf_${Date.now()}`;
  149. const paramDecls = this.registry.getParamDeclarations();
  150. const varDecls = this.registry.getVariableDeclarations();
  151. // Initialize params (read-only)
  152. const params = {};
  153. const paramTypes = {};
  154. for (const [name, decl] of Object.entries(paramDecls)) {
  155. paramTypes[name] = decl.type;
  156. if (initialVars[name] !== undefined) {
  157. params[name] = applyParamType(decl.type, initialVars[name]);
  158. } else if (decl.default !== undefined) {
  159. params[name] = decl.default;
  160. }
  161. }
  162. // Store extra initialVars not declared in registry as params (pass-through)
  163. for (const [name, value] of Object.entries(initialVars)) {
  164. if (!(name in paramDecls) && !name.startsWith('$')) {
  165. params[name] = value;
  166. }
  167. }
  168. // RunParams override
  169. if (runParams?.params) {
  170. for (const [k, v] of Object.entries(runParams.params)) {
  171. if (k in paramDecls) params[k] = applyParamType(paramDecls[k].type, v);
  172. else params[k] = v;
  173. }
  174. }
  175. // Initialize variables
  176. const variables = {};
  177. const varTypes = {};
  178. for (const [name, decl] of Object.entries(varDecls)) {
  179. varTypes[name] = decl.type;
  180. variables[name] = initialVars[name] !== undefined ? applyVarType(decl.type, initialVars[name]) : null;
  181. }
  182. const ctx = new ExecutionContext({
  183. workflowID, params, paramTypes, variables, varTypes,
  184. adapters, runParams, version: this.workflow.version || '3.15'
  185. });
  186. // Expose active context on engine instance (for external resume/pause access)
  187. this.activeCtx = ctx;
  188. // Attach event listener if provided
  189. if (this._onEvent) ctx.onEvent(this._onEvent);
  190. // Execute
  191. try {
  192. emitEvent(ctx, RunEventType.WorkflowStart, null, {
  193. name: this.workflow.name, version: this.workflow.version, params
  194. });
  195. const entryIDs = this._findEntryNodeIDs();
  196. if (entryIDs.length === 0) throw new Error('No entry nodes found');
  197. if (entryIDs.length === 1) {
  198. const entryStep = this.getStep(entryIDs[0]);
  199. await this.executeStep(ctx, entryStep);
  200. } else {
  201. await this._executeChildren(ctx, entryIDs, '_entry');
  202. }
  203. if (ctx.status === ExecutionStatus.Running) {
  204. ctx.status = ExecutionStatus.Completed;
  205. emitEvent(ctx, RunEventType.WorkflowDone, null, {
  206. duration_ms: Date.now() - ctx.startTime
  207. });
  208. }
  209. } catch (err) {
  210. ctx.status = ExecutionStatus.Failed;
  211. emitEvent(ctx, RunEventType.WorkflowFailed, null, {
  212. error: err.message, failed_step_id: ctx.currentStepID,
  213. duration_ms: Date.now() - ctx.startTime
  214. });
  215. throw err;
  216. }
  217. return ctx;
  218. }
  219. /**
  220. * Resume a paused workflow.
  221. */
  222. resume(ctx, request) {
  223. if (!ctx.pauseState) throw new Error('Workflow is not paused');
  224. if (ctx.pauseState.token !== request.token) {
  225. emitEvent(ctx, RunEventType.PauseRejected, ctx.pauseState.nodeID, {
  226. reason: 'invalid_token'
  227. });
  228. throw new Error('Invalid pause token');
  229. }
  230. // Idempotency
  231. if (request.requestId && ctx.pauseState.seenRequestIDs.has(request.requestId)) return;
  232. if (request.requestId) ctx.pauseState.seenRequestIDs.add(request.requestId);
  233. ctx.pauseState._resolve({ payload: request.payload, requestId: request.requestId });
  234. }
  235. // ─── Execute From Checkpoint / Step ID ─────────────────────
  236. /**
  237. * Resume execution from a checkpoint or a specific step.
  238. *
  239. * Usage 1 — from checkpoint (crash recovery):
  240. * const cp = JSON.parse(fs.readFileSync('checkpoint.json'));
  241. * const ctx = await engine.executeFrom(cp, adapters);
  242. *
  243. * Usage 2 — from step ID with variable overrides (re-run):
  244. * const ctx = await engine.executeFrom({
  245. * currentStepID: 'LLM_GenerateCode',
  246. * variables: { '$plan': modifiedPlan },
  247. * params: originalParams
  248. * }, adapters);
  249. *
  250. * @param {object} checkpoint — checkpoint object or { currentStepID, params?, variables? }
  251. * @param {object} adapters — { service, api, component, llm, file, doc }
  252. * @param {object} [overrides] — optional variable overrides applied after checkpoint restore
  253. * @returns {ExecutionContext}
  254. */
  255. async executeFrom(checkpoint, adapters = {}, overrides = null) {
  256. if (!checkpoint?.currentStepID) throw new Error('checkpoint.currentStepID is required');
  257. const startStepID = checkpoint.currentStepID;
  258. const startStep = this.getStep(startStepID);
  259. if (!startStep) throw new Error(`Step not found: ${startStepID}`);
  260. // Rebuild context from checkpoint data
  261. const workflowID = checkpoint.workflowID || `wf_resume_${Date.now()}`;
  262. const ctx = new ExecutionContext({
  263. workflowID,
  264. params: checkpoint.params ? structuredCloneJSON(checkpoint.params) : {},
  265. paramTypes: checkpoint.paramTypes || {},
  266. variables: checkpoint.variables ? structuredCloneJSON(checkpoint.variables) : {},
  267. varTypes: checkpoint.varTypes || {},
  268. localVars: checkpoint.localVars ? structuredCloneJSON(checkpoint.localVars) : {},
  269. artifacts: checkpoint.artifacts || {},
  270. adapters,
  271. runParams: checkpoint.runParams || null,
  272. version: checkpoint.version || this.workflow.version || '3.15'
  273. });
  274. // Expose active context on engine instance (for external resume/pause access)
  275. this.activeCtx = ctx;
  276. // Restore completed steps list
  277. if (checkpoint.completedSteps) {
  278. ctx._completedSteps = [...checkpoint.completedSteps];
  279. }
  280. // Restore parallel branch completion state
  281. if (checkpoint.completedBranches) {
  282. for (const [parent, children] of Object.entries(checkpoint.completedBranches)) {
  283. ctx._completedBranches.set(parent, new Set(children));
  284. }
  285. }
  286. // Restore loop progress
  287. if (checkpoint.loopProgress) {
  288. for (const [loopID, count] of Object.entries(checkpoint.loopProgress)) {
  289. ctx._loopProgress.set(loopID, count);
  290. }
  291. }
  292. // Restore event sequence counter for continuity
  293. if (checkpoint.eventSeq) {
  294. ctx._eventSeq = checkpoint.eventSeq;
  295. }
  296. // Apply overrides (e.g., user tweaked a variable before re-running)
  297. if (overrides) {
  298. for (const [k, v] of Object.entries(overrides)) {
  299. if (k.startsWith('$') || k.startsWith('_')) {
  300. ctx.setVariable(k, v);
  301. } else {
  302. ctx.params[k] = v;
  303. }
  304. }
  305. }
  306. // Attach event listener
  307. if (this._onEvent) ctx.onEvent(this._onEvent);
  308. // Execute from the specified step
  309. try {
  310. emitEvent(ctx, RunEventType.WorkflowStart, null, {
  311. name: this.workflow.name, version: this.workflow.version,
  312. params: ctx.params, resumedFrom: startStepID
  313. });
  314. await this.executeStep(ctx, startStep);
  315. if (ctx.status === ExecutionStatus.Running) {
  316. ctx.status = ExecutionStatus.Completed;
  317. emitEvent(ctx, RunEventType.WorkflowDone, null, {
  318. duration_ms: Date.now() - ctx.startTime
  319. });
  320. }
  321. } catch (err) {
  322. ctx.status = ExecutionStatus.Failed;
  323. emitEvent(ctx, RunEventType.WorkflowFailed, null, {
  324. error: err.message, failed_step_id: ctx.currentStepID,
  325. duration_ms: Date.now() - ctx.startTime
  326. });
  327. throw err;
  328. }
  329. return ctx;
  330. }
  331. // ─── Step Execution ────────────────────────────────────────
  332. async executeStep(ctx, step) {
  333. // Short-circuit if stopped/cancelled/paused
  334. if (ctx.status === ExecutionStatus.Stopped || ctx.aborted) return;
  335. if (ctx.status === ExecutionStatus.Paused) {
  336. this._emitCheckpoint(ctx); // save checkpoint so user can resume
  337. return;
  338. }
  339. ctx.currentStepID = step.id;
  340. const rawType = getStepType(step.id);
  341. const stepType = resolveStepBehavior(rawType) || rawType;
  342. // Evaluate condition
  343. if (step.if) {
  344. const ev = new ExpressionEvaluator(ctx);
  345. const cond = ev.evaluateValue(step.if);
  346. if (!toBool(cond)) {
  347. emitEvent(ctx, RunEventType.StepSkipped, step.id, { condition: step.if });
  348. return this._moveToNext(ctx, step);
  349. }
  350. }
  351. // Resolve inputs for event payload (lightweight re-evaluation, approach B)
  352. let resolvedInputs = null;
  353. if (step.in && typeof step.in === 'object') {
  354. try {
  355. const ev2 = new ExpressionEvaluator(ctx);
  356. resolvedInputs = summarizeDeep(ev2.evaluateDeep(step.in));
  357. } catch { /* best-effort — don't break execution */ }
  358. }
  359. // Emit step_start with resolved inputs
  360. emitEvent(ctx, RunEventType.StepStart, step.id, {
  361. type: rawType, meta: step.meta,
  362. resolvedInputs // may be null if step has no inputs
  363. });
  364. // Collect file targets for file_start events
  365. const fileTargets = this._collectFileTargets(step);
  366. for (const ft of fileTargets) {
  367. emitEvent(ctx, RunEventType.FileStart, step.id, { path: ft });
  368. }
  369. // Determine max attempts for onError: 'retry' (v3.10+)
  370. const isRetry = step.onError === 'retry' && isV310OrLater(this.workflow.version);
  371. const maxAttempts = isRetry ? (step.retryCount ?? 1) + 1 : 1;
  372. let err = null;
  373. const prefix = step.id.split('_')[0];
  374. const customHandler = this.customHandlers[prefix];
  375. for (let attempt = 0; attempt < maxAttempts; attempt++) {
  376. err = null;
  377. try {
  378. if (customHandler) {
  379. // Delegate to custom handler — same lifecycle as built-in steps
  380. await customHandler(this, ctx, step);
  381. } else {
  382. switch (stepType) {
  383. case 'Service': await executeServiceStep(this, ctx, step); break;
  384. case 'Component': await executeComponentStep(this, ctx, step); break;
  385. case 'LLM': await executeLLMStep(this, ctx, step); break;
  386. case 'API': await executeAPIStep(this, ctx, step); break;
  387. case 'Set': executeSetStep(this, ctx, step); break;
  388. case 'Write': await executeWriteStep(this, ctx, step); break;
  389. case 'Download': await executeDownloadStep(this, ctx, step); break;
  390. case 'Unzip': await executeUnzipStep(this, ctx, step); break;
  391. case 'Pause': {
  392. const result = await executePauseStep(this, ctx, step);
  393. if (result?.handled) return; // Pause handles its own next
  394. break;
  395. }
  396. case 'Branch': {
  397. const selectedID = executeBranchStep(this, ctx, step);
  398. emitEvent(ctx, RunEventType.StepDone, step.id, {
  399. selected: selectedID, outputs: collectStepOutputs(ctx, step)
  400. });
  401. if (ctx._completedSteps) ctx._completedSteps.push(step.id);
  402. this._emitCheckpoint(ctx);
  403. if (selectedID) {
  404. const branchStep = this.getStep(selectedID);
  405. if (branchStep) await this.executeStep(ctx, branchStep);
  406. }
  407. return this._moveToNext(ctx, step);
  408. }
  409. case 'Loop':
  410. await executeLoopStep(this, ctx, step);
  411. emitEvent(ctx, RunEventType.StepDone, step.id, {
  412. outputs: collectStepOutputs(ctx, step)
  413. });
  414. if (ctx._completedSteps) ctx._completedSteps.push(step.id);
  415. this._emitCheckpoint(ctx);
  416. return this._moveToNext(ctx, step);
  417. case 'Stop':
  418. ctx.status = ExecutionStatus.Stopped;
  419. emitEvent(ctx, RunEventType.WorkflowDone, null, {
  420. stop_id: step.id, duration_ms: Date.now() - ctx.startTime
  421. });
  422. return;
  423. case 'Noop':
  424. break;
  425. default:
  426. throw new Error(`Unknown step type: ${stepType} (${step.id})`);
  427. }
  428. }
  429. } catch (e) {
  430. err = e;
  431. if (attempt + 1 < maxAttempts) {
  432. emitEvent(ctx, RunEventType.StepError, step.id, {
  433. error: e.message, attempt: attempt + 1, maxAttempts
  434. });
  435. if (step.retryDelay) await new Promise(r => setTimeout(r, step.retryDelay));
  436. continue;
  437. }
  438. }
  439. if (!err) break; // success — exit retry loop
  440. }
  441. if (err) {
  442. emitEvent(ctx, RunEventType.StepError, step.id, { error: err.message });
  443. // OnError handler (v3.10+)
  444. if (step.onError && isV310OrLater(this.workflow.version)) {
  445. ctx.setVariable('_error', buildErrorMap(err));
  446. // Handle built-in keywords explicitly
  447. if (step.onError === 'retry' || step.onError === 'skip' ||
  448. step.onError === 'ignore' || step.onError === 'continue') {
  449. ctx.setVariable('_error', undefined);
  450. return this._moveToNext(ctx, step);
  451. }
  452. // Fall through to step-ID handler
  453. const errorStep = this.getStep(step.onError);
  454. try {
  455. if (errorStep) await this.executeStep(ctx, errorStep);
  456. } finally {
  457. ctx.setVariable('_error', undefined);
  458. ctx.setVariable('_meta', undefined);
  459. }
  460. return this._moveToNext(ctx, step);
  461. }
  462. throw err;
  463. }
  464. // step_print (v3.13+)
  465. if (step.print) {
  466. const ev = new ExpressionEvaluator(ctx);
  467. const printVal = ev.evaluateValue(step.print);
  468. emitEvent(ctx, RunEventType.StepPrint, step.id, { value: printVal });
  469. }
  470. emitEvent(ctx, RunEventType.StepDone, step.id, {
  471. outputs: collectStepOutputs(ctx, step)
  472. });
  473. if (ctx._completedSteps) ctx._completedSteps.push(step.id);
  474. this._emitCheckpoint(ctx);
  475. // Execute children (parallel branches)
  476. if (step.children && step.children.length > 0 && stepType !== 'Loop' && stepType !== 'Branch') {
  477. await this._executeChildren(ctx, step.children, step.id);
  478. }
  479. // Move to next
  480. return this._moveToNext(ctx, step);
  481. }
  482. /** Emit checkpoint after step completion (if callback registered). */
  483. _emitCheckpoint(ctx) {
  484. if (this._onCheckpoint) {
  485. try { this._onCheckpoint(ctx.checkpoint()); } catch {}
  486. }
  487. }
  488. // ─── Internal ──────────────────────────────────────────────
  489. async _moveToNext(ctx, step) {
  490. if (!step.next || step.next === 'RETURN' || step.next === 'STOP') return;
  491. if (step.next === 'BREAK') throw new LoopBreakSignal();
  492. if (ctx.status === ExecutionStatus.Stopped || ctx.aborted) return;
  493. const nextStep = this.getStep(step.next);
  494. if (nextStep) await this.executeStep(ctx, nextStep);
  495. }
  496. async _executeChildren(ctx, childIDs, parentStepID = null) {
  497. if (!childIDs || childIDs.length === 0) return;
  498. // Phase 2: filter out already-completed branches (for resume)
  499. const completedSet = parentStepID ? ctx._completedBranches.get(parentStepID) : null;
  500. const pendingIDs = completedSet
  501. ? childIDs.filter(id => !completedSet.has(id))
  502. : childIDs;
  503. if (pendingIDs.length === 0) return;
  504. if (pendingIDs.length === 1) {
  505. const child = this.getStep(pendingIDs[0]);
  506. if (child) {
  507. await this.executeStep(ctx, child);
  508. // Track completion
  509. if (parentStepID) this._markBranchCompleted(ctx, parentStepID, pendingIDs[0]);
  510. }
  511. return;
  512. }
  513. // Parallel execution
  514. const engine = this;
  515. const branches = pendingIDs.map(id => ({
  516. id,
  517. fn: async (failFastSignal) => {
  518. const childCtx = new ChildExecutionContext(ctx);
  519. if (failFastSignal) childCtx._failFastSignal = failFastSignal;
  520. // Copy parent local vars to child
  521. for (const key of ['_item', '_index', '_result', '_meta', '_error']) {
  522. if (ctx.localVars?.[key] !== undefined) childCtx.localVars[key] = ctx.localVars[key];
  523. else if (ctx.getVariable?.(key) !== undefined) childCtx.localVars[key] = ctx.getVariable(key);
  524. }
  525. const step = engine.getStep(id);
  526. if (step) await engine.executeStep(childCtx, step);
  527. // Track branch completion
  528. if (parentStepID) engine._markBranchCompleted(ctx, parentStepID, id);
  529. }
  530. }));
  531. await this.parallelExecutor.execute(branches, this.errorStrategy, ctx.signal);
  532. }
  533. _markBranchCompleted(ctx, parentStepID, childID) {
  534. if (!ctx._completedBranches.has(parentStepID)) {
  535. ctx._completedBranches.set(parentStepID, new Set());
  536. }
  537. ctx._completedBranches.get(parentStepID).add(childID);
  538. this._emitCheckpoint(ctx);
  539. }
  540. _findEntryNodeIDs() {
  541. const referenced = new Set();
  542. for (const step of this.workflow.steps) {
  543. if (step.next && step.next !== 'RETURN' && step.next !== 'STOP' && step.next !== 'BREAK') referenced.add(step.next);
  544. for (const child of (step.children || [])) referenced.add(child);
  545. for (const cas of (step.cases || [])) {
  546. if (Array.isArray(cas) && cas.length >= 2) referenced.add(cas[1]);
  547. if (cas && typeof cas === 'object' && !Array.isArray(cas)) {
  548. const ref = cas.next || cas.step;
  549. if (ref) referenced.add(ref);
  550. }
  551. }
  552. if (step.default) referenced.add(step.default);
  553. if (step.if_true) referenced.add(step.if_true);
  554. if (step.if_false) referenced.add(step.if_false);
  555. if (step.onError) referenced.add(step.onError);
  556. if (step.timeout?.on) referenced.add(step.timeout.on);
  557. }
  558. return this.workflow.steps.map(s => s.id).filter(id => !referenced.has(id));
  559. }
  560. _collectFileTargets(step) {
  561. const targets = [];
  562. if (step.target && typeof step.target === 'string' && getStepType(step.id) === 'Write') {
  563. targets.push(step.target);
  564. }
  565. if (step.out) {
  566. for (const target of Object.keys(step.out)) {
  567. if (target.startsWith('/')) targets.push(target);
  568. }
  569. }
  570. return targets;
  571. }
  572. }
  573. // ─── Event Payload Helpers ────────────────────────────────────
  574. /**
  575. * Truncate large values to prevent oversized event payloads.
  576. * If a single value serializes to > maxBytes, replace with a summary.
  577. */
  578. function summarizeIfLarge(value, maxBytes = 10240) {
  579. if (value === null || value === undefined) return value;
  580. if (typeof value !== 'object' && typeof value !== 'string') return value;
  581. let json;
  582. try { json = JSON.stringify(value); } catch { return { _truncated: true, type: typeof value, error: 'non-serializable' }; }
  583. if (json.length <= maxBytes) return value;
  584. return { _truncated: true, type: typeof value, length: json.length, preview: json.slice(0, 200) + '...' };
  585. }
  586. /** Apply summarizeIfLarge to every value in a shallow object. */
  587. function summarizeDeep(obj, maxBytes = 10240) {
  588. if (!obj || typeof obj !== 'object') return summarizeIfLarge(obj, maxBytes);
  589. const out = {};
  590. for (const [k, v] of Object.entries(obj)) {
  591. out[k] = summarizeIfLarge(v, maxBytes);
  592. }
  593. return out;
  594. }
  595. /**
  596. * Collect outputs after applyOutputMapping by reading back the targets from ctx.
  597. * For $variable targets, read the value; for /file/ targets, return metadata only.
  598. */
  599. function collectStepOutputs(ctx, step) {
  600. if (!step.out) return null;
  601. const outputs = {};
  602. // Shorthand: out is a string like "$plan"
  603. if (typeof step.out === 'string') {
  604. outputs[step.out] = summarizeIfLarge(ctx.getVariable(step.out));
  605. return outputs;
  606. }
  607. for (const target of Object.keys(step.out)) {
  608. if (target.startsWith('/')) {
  609. // File target — only metadata
  610. outputs[target] = { _file: true, path: target };
  611. } else if (target.startsWith('$') || target.startsWith('_')) {
  612. outputs[target] = summarizeIfLarge(ctx.getVariable(target));
  613. }
  614. }
  615. return outputs;
  616. }
  617. // ─── Helpers ─────────────────────────────────────────────────
  618. function applyParamType(type, value) {
  619. if (type === 'OBJECT' && typeof value === 'string') {
  620. try { return JSON.parse(value); } catch { return value; }
  621. }
  622. if (type === 'STRING' && typeof value !== 'string') {
  623. return typeof value === 'object' ? JSON.stringify(value) : String(value);
  624. }
  625. return value;
  626. }
  627. function applyVarType(type, value) {
  628. if (type === 'OBJECT' && typeof value === 'string') {
  629. try { return JSON.parse(value); } catch { return value; }
  630. }
  631. if (type?.startsWith('[') && typeof value === 'string') {
  632. try { return JSON.parse(value); } catch { return value; }
  633. }
  634. return value;
  635. }
  636. module.exports = { Engine };